Mengatur pemantauan kustom jalur penerapan DLT dengan pengait kejadian
Anda dapat menggunakan kait peristiwa untuk menambahkan fungsi panggilan balik Python kustom yang berjalan saat peristiwa dipertahankan ke log peristiwa alur DLT. Anda dapat menggunakan kait peristiwa untuk mengimplementasikan solusi pemantauan dan pemberitahuan kustom. Misalnya, Anda dapat menggunakan hook peristiwa untuk mengirim email atau menulis ke log saat peristiwa tertentu terjadi atau untuk berintegrasi dengan solusi pihak ketiga untuk memantau peristiwa alur.
Anda menentukan hook peristiwa dengan fungsi Python yang menerima satu argumen, di mana argumen adalah kamus yang mewakili peristiwa. Anda kemudian menyertakan event hooks sebagai bagian dari kode sumber untuk pipeline. Setiap kait peristiwa yang ditentukan dalam alur akan mencoba memproses semua peristiwa yang dihasilkan selama setiap pembaruan alur. Jika pipeline Anda terdiri dari beberapa artefak kode sumber, misalnya beberapa notebook, maka setiap event hook yang ditentukan diterapkan ke seluruh pipeline. Meskipun kait peristiwa disertakan dalam kode sumber untuk alur Anda, kait tersebut tidak disertakan dalam grafik alur.
Anda dapat menggunakan pengait acara dengan alur kerja yang diterbitkan ke Metastore Apache Hive atau Katalog Unity.
Nota
- Python adalah satu-satunya bahasa yang didukung untuk menentukan kait peristiwa. Untuk menentukan fungsi Python kustom yang memproses peristiwa dalam alur yang diimplementasikan menggunakan antarmuka SQL, tambahkan fungsi kustom dalam buku catatan Python terpisah yang berjalan sebagai bagian dari alur. Fungsi Python diterapkan ke seluruh alur saat alur berjalan.
- Pengait peristiwa dipicu hanya untuk peristiwa ketika tingkat kematangan adalah
STABLE
. - Kait peristiwa dijalankan secara asinkron dengan pembaruan proses tetapi secara sinkron dengan kait peristiwa lainnya. Ini berarti bahwa hanya satu hook acara yang berjalan pada satu waktu, dan hook acara lainnya menunggu dieksekusi hingga hook acara tersebut selesai. Jika sebuah event hook berjalan tak terbatas, itu memblokir semua event hook lainnya.
- DLT mencoba menjalankan setiap kait peristiwa pada setiap peristiwa yang dipancarkan selama pembaruan alur. Untuk membantu memastikan bahwa kait peristiwa yang tertinggal memiliki waktu untuk memproses semua peristiwa antrean, DLT menunggu periode tetap yang tidak dapat dikonfigurasi sebelum mengakhiri komputasi yang menjalankan alur. Namun, tidak dijamin bahwa semua kait dipicu pada semua peristiwa sebelum komputasi dihentikan.
Memantau pemrosesan kait peristiwa
Gunakan jenis peristiwa hook_progress
di log peristiwa DLT untuk memantau status kait peristiwa pembaruan. Untuk mencegah dependensi melingkar, kait peristiwa tidak dipicu untuk peristiwa hook_progress
.
Tentukan peristiwa hook
Untuk menentukan event hook, gunakan dekorator on_event_hook
:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures
menggambarkan jumlah maksimum kali berturut-turut sebuah kait peristiwa dapat gagal sebelum dinonaktifkan. Kegagalan kait peristiwa didefinisikan sebagai kapan saja kait peristiwa melemparkan pengecualian. Jika hook peristiwa dinonaktifkan, itu tidak memproses peristiwa baru sampai alur dimulai ulang.
max_allowable_consecutive_failures
harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0
atau None
. Nilai None
(ditetapkan secara default) berarti jumlah kegagalan berturut-turut yang diizinkan untuk hook peristiwa tidak terbatas, dan hook peristiwa tidak pernah dinonaktifkan.
Kegagalan kait peristiwa dan penonaktifan kait peristiwa dapat dipantau dalam log peristiwa sebagai peristiwa hook_progress
.
Fungsi hook peristiwa harus berupa fungsi Python yang menerima tepat satu parameter, yaitu representasi dalam bentuk kamus dari peristiwa yang memicu fungsi hook peristiwa ini. Nilai pengembalian apa pun dari fungsi kait peristiwa diabaikan.
Contoh: Pilih peristiwa tertentu untuk diproses
Contoh berikut menunjukkan kait peristiwa yang memilih peristiwa tertentu untuk diproses. Khususnya, contoh ini menunggu hingga peristiwa alur STOPPING
diterima dan kemudian mengeluarkan pesan ke log pengemudi stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Contoh: Mengirim semua peristiwa ke saluran Slack
Contoh berikut mengimplementasikan kait peristiwa yang mengirim semua peristiwa yang diterima ke saluran Slack menggunakan API Slack.
Contoh ini menggunakan databricks rahasia untuk menyimpan token yang diperlukan untuk mengautentikasi ke API Slack dengan aman.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Contoh: Mengonfigurasi hook peristiwa untuk dinonaktifkan setelah empat kegagalan berturut-turut
Contoh berikut menunjukkan cara mengonfigurasi kait peristiwa yang dinonaktifkan jika gagal secara berturut-turut empat kali.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Contoh: Alur DLT dengan hook kejadian
Contoh berikut menunjukkan penambahan kait peristiwa ke kode sumber untuk alur. Ini adalah contoh sederhana tetapi lengkap menggunakan kait peristiwa dengan alur.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})