DLT-folyamatok egyéni monitorozásának definiálása eseményhookokkal
Fontos
Az eseményhorgok támogatása a nyilvános előzetes verzióban.
Az eseményhookok használatával olyan egyéni Python-visszahívási függvényeket adhat hozzá, amelyek akkor futnak le, amikor az események bekerülnek egy DLT-folyamat eseménynaplójába. Az eseményhookok használatával egyéni monitorozási és riasztási megoldásokat valósíthat meg. Az eseményhookok használatával például e-maileket küldhet vagy írhat egy naplóba adott események bekövetkezésekor, vagy integrálható külső megoldásokkal a folyamatesemények figyeléséhez.
Olyan eseményhookot definiálhat egy Python-függvénnyel, amely egyetlen argumentumot fogad el, ahol az argumentum egy eseményt jelképező szótár. Ezután az eseményhookokat a pipeline forráskódjába illeszti. A folyamatban definiált eseményhohogok megpróbálják feldolgozni az egyes folyamatfrissítések során létrehozott összes eseményt. Ha a folyamat több forráskód-összetevőből, például több jegyzetfüzetből áll, a rendszer minden definiált eseményhookot a teljes folyamatra alkalmaz. Bár az eseményhogok szerepelnek a folyamat forráskódjában, ezek nem szerepelnek a folyamatgráfban.
Az eseményhohogokat olyan folyamatokkal használhatja, amelyek közzétehetők a Hive metaadattárában vagy a Unity-katalógusban.
Jegyzet
- A Python az egyetlen támogatott nyelv az eseményhookok meghatározásához. Ha egyéni Python-függvényeket szeretne definiálni, amelyek az SQL-felületen implementált folyamat eseményeit dolgozzák fel, adja hozzá az egyéni függvényeket egy külön Python-jegyzetfüzethez, amely a folyamat részeként fut. A Python-függvények a folyamat futtatásakor a teljes folyamatra lesznek alkalmazva.
- Az eseményhookok csak olyan események esetén aktiválódnak, amelyekben a maturity_level
STABLE
. - Az eseményhookokat aszinkron módon kerülnek végrehajtásra a folyamatfrissítések során, de szinkron módon más eseményhookokkal. Ez azt jelenti, hogy egyszerre csak egyetlen eseményhook fut, más eseményhookok pedig addig várnak, amíg az éppen futó eseményhook befejeződik. Ha egy eseményhohorog határozatlan ideig fut, letiltja az összes többi eseményhookot.
- A DLT minden eseménykampó futtatását megkísérli a folyamatfrissítés során kibocsátott összes eseményen. Annak biztosítása érdekében, hogy az elmaradt eseményhohogoknak legyen ideje az összes várólistás esemény feldolgozására, a DLT egy nem konfigurálható rögzített időszakot vár, mielőtt leállítanák a folyamatot futtató számítást. Azonban nem garantált, hogy az összes horog aktiválódik az összes eseményen a számítás leállása előtt.
Eseménykampó feldolgozásának figyelése
A DLT-eseménynapló hook_progress
eseménytípusával monitorozza a frissítés eseményhookainak állapotát. A körkörös függőségek elkerülése érdekében az eseményhookok nem aktiválódnak hook_progress
eseményekhez.
Eseménycsatoló definiálása
Eseményhook definiálásához használja a on_event_hook
dekoratőrt:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
A max_allowable_consecutive_failures
azt írja le, hogy egy eseményhook legfeljebb hány egymást követő alkalommal hiúsulhat meg, mielőtt az le lenne tiltva. Az eseményhook meghibásodása akkor van definiálva, amikor az eseményhook kivételt okoz. Ha egy eseményhohorog le van tiltva, a folyamat újraindításáig nem dolgoz fel új eseményeket.
max_allowable_consecutive_failures
0
vagy None
értéknél nagyobb vagy egyenlő egész számnak kell lennie. A None
értéke (alapértelmezés szerint hozzárendelve) azt jelenti, hogy nincs korlátozva az eseményhook számára engedélyezett egymást követő hibák száma, és az eseményhook soha nem lesz letiltva.
Az eseményhook hibái és az eseményhookok letiltása az eseménynaplóban hook_progress
eseményként figyelhető meg.
Az eseményhorog függvénynek olyan Python-függvénynek kell lennie, amely pontosan egy paramétert fogad el, amely az eseményhoomot kiváltó esemény szótári ábrázolása. Az eseményhook függvény bármely visszatérési értéke figyelmen kívül lesz hagyva.
Példa: Adott események kiválasztása feldolgozásra
Az alábbi példa egy eseményhookot mutat be, amely meghatározott eseményeket választ ki feldolgozásra. Ez a példa konkrétan megvárja, amíg a csővezeték STOPPING
eseményei megérkeznek, majd üzenetet küld az illesztőprogram naplójába stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Példa: Az összes esemény küldése Slack-csatornára
Az alábbi példa egy eseményhookot implementál, amely a Slack API használatával elküldi az összes fogadott eseményt egy Slack-csatornára.
Ez a példa egy Databricks-titkot használ arra, hogy a Slack API-hoz szükséges token biztonságosan tárolásra kerüljön.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Példa: Konfiguráljon egy eseményhookot négy egymást követő hiba letiltásához
Az alábbi példa bemutatja, hogyan konfigurálhat egy letiltott eseményhookot, ha az egymást követő négy alkalommal meghiúsul.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Példa: Egy DLT-folyamat eseményhook használatával
Az alábbi példa egy eseményhook hozzáadását mutatja be egy folyamat forráskódjához. Ez egy egyszerű, de teljes példája az eseményhookok adatfolyam szerinti használatának.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})