Megosztás a következőn keresztül:


DLT-folyamatok egyéni monitorozásának definiálása eseményhookokkal

Fontos

Az eseményhorgok támogatása a nyilvános előzetes verzióban.

Az eseményhookok használatával olyan egyéni Python-visszahívási függvényeket adhat hozzá, amelyek akkor futnak le, amikor az események bekerülnek egy DLT-folyamat eseménynaplójába. Az eseményhookok használatával egyéni monitorozási és riasztási megoldásokat valósíthat meg. Az eseményhookok használatával például e-maileket küldhet vagy írhat egy naplóba adott események bekövetkezésekor, vagy integrálható külső megoldásokkal a folyamatesemények figyeléséhez.

Olyan eseményhookot definiálhat egy Python-függvénnyel, amely egyetlen argumentumot fogad el, ahol az argumentum egy eseményt jelképező szótár. Ezután az eseményhookokat a pipeline forráskódjába illeszti. A folyamatban definiált eseményhohogok megpróbálják feldolgozni az egyes folyamatfrissítések során létrehozott összes eseményt. Ha a folyamat több forráskód-összetevőből, például több jegyzetfüzetből áll, a rendszer minden definiált eseményhookot a teljes folyamatra alkalmaz. Bár az eseményhogok szerepelnek a folyamat forráskódjában, ezek nem szerepelnek a folyamatgráfban.

Az eseményhohogokat olyan folyamatokkal használhatja, amelyek közzétehetők a Hive metaadattárában vagy a Unity-katalógusban.

Jegyzet

  • A Python az egyetlen támogatott nyelv az eseményhookok meghatározásához. Ha egyéni Python-függvényeket szeretne definiálni, amelyek az SQL-felületen implementált folyamat eseményeit dolgozzák fel, adja hozzá az egyéni függvényeket egy külön Python-jegyzetfüzethez, amely a folyamat részeként fut. A Python-függvények a folyamat futtatásakor a teljes folyamatra lesznek alkalmazva.
  • Az eseményhookok csak olyan események esetén aktiválódnak, amelyekben a maturity_levelSTABLE.
  • Az eseményhookokat aszinkron módon kerülnek végrehajtásra a folyamatfrissítések során, de szinkron módon más eseményhookokkal. Ez azt jelenti, hogy egyszerre csak egyetlen eseményhook fut, más eseményhookok pedig addig várnak, amíg az éppen futó eseményhook befejeződik. Ha egy eseményhohorog határozatlan ideig fut, letiltja az összes többi eseményhookot.
  • A DLT minden eseménykampó futtatását megkísérli a folyamatfrissítés során kibocsátott összes eseményen. Annak biztosítása érdekében, hogy az elmaradt eseményhohogoknak legyen ideje az összes várólistás esemény feldolgozására, a DLT egy nem konfigurálható rögzített időszakot vár, mielőtt leállítanák a folyamatot futtató számítást. Azonban nem garantált, hogy az összes horog aktiválódik az összes eseményen a számítás leállása előtt.

Eseménykampó feldolgozásának figyelése

A DLT-eseménynapló hook_progress eseménytípusával monitorozza a frissítés eseményhookainak állapotát. A körkörös függőségek elkerülése érdekében az eseményhookok nem aktiválódnak hook_progress eseményekhez.

Eseménycsatoló definiálása

Eseményhook definiálásához használja a on_event_hook dekoratőrt:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

A max_allowable_consecutive_failures azt írja le, hogy egy eseményhook legfeljebb hány egymást követő alkalommal hiúsulhat meg, mielőtt az le lenne tiltva. Az eseményhook meghibásodása akkor van definiálva, amikor az eseményhook kivételt okoz. Ha egy eseményhohorog le van tiltva, a folyamat újraindításáig nem dolgoz fel új eseményeket.

max_allowable_consecutive_failures 0 vagy Noneértéknél nagyobb vagy egyenlő egész számnak kell lennie. A None értéke (alapértelmezés szerint hozzárendelve) azt jelenti, hogy nincs korlátozva az eseményhook számára engedélyezett egymást követő hibák száma, és az eseményhook soha nem lesz letiltva.

Az eseményhook hibái és az eseményhookok letiltása az eseménynaplóban hook_progress eseményként figyelhető meg.

Az eseményhorog függvénynek olyan Python-függvénynek kell lennie, amely pontosan egy paramétert fogad el, amely az eseményhoomot kiváltó esemény szótári ábrázolása. Az eseményhook függvény bármely visszatérési értéke figyelmen kívül lesz hagyva.

Példa: Adott események kiválasztása feldolgozásra

Az alábbi példa egy eseményhookot mutat be, amely meghatározott eseményeket választ ki feldolgozásra. Ez a példa konkrétan megvárja, amíg a csővezeték STOPPING eseményei megérkeznek, majd üzenetet küld az illesztőprogram naplójába stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Példa: Az összes esemény küldése Slack-csatornára

Az alábbi példa egy eseményhookot implementál, amely a Slack API használatával elküldi az összes fogadott eseményt egy Slack-csatornára.

Ez a példa egy Databricks-titkot használ arra, hogy a Slack API-hoz szükséges token biztonságosan tárolásra kerüljön.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Példa: Konfiguráljon egy eseményhookot négy egymást követő hiba letiltásához

Az alábbi példa bemutatja, hogyan konfigurálhat egy letiltott eseményhookot, ha az egymást követő négy alkalommal meghiúsul.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Példa: Egy DLT-folyamat eseményhook használatával

Az alábbi példa egy eseményhook hozzáadását mutatja be egy folyamat forráskódjához. Ez egy egyszerű, de teljes példája az eseményhookok adatfolyam szerinti használatának.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })