Definir a monitorização personalizada das pipelines DLT com ganchos de eventos
Importante
O suporte para ganchos de eventos está no Public Preview.
Você pode usar ganchos de eventos para adicionar funções de retorno de chamada Python personalizadas que são executadas quando os eventos são persistidos no log de eventos de um pipeline DLT. Você pode usar ganchos de eventos para implementar soluções personalizadas de monitoramento e alerta. Por exemplo, você pode usar ganchos de eventos para enviar e-mails ou gravar em um log quando eventos específicos ocorrerem ou para integrar com soluções de terceiros para monitorar eventos de pipeline.
Você define um gancho de eventos com uma função Python que aceita um único argumento, onde o argumento é um dicionário que representa um evento. Em seguida, você inclui os ganchos de evento como parte do código-fonte de um pipeline. Ganchos de eventos definidos num pipeline tentarão processar todos os eventos gerados durante cada atualização do pipeline. Se o pipeline for composto por vários artefatos de código-fonte, por exemplo, vários blocos de anotações, todos os ganchos de eventos definidos serão aplicados a todo o pipeline. Embora os ganchos de eventos estejam incluídos no código-fonte do seu pipeline, eles não estão representados no diagrama do pipeline.
Você pode usar ganchos de eventos com pipelines que publicam no metastore do Hive ou no Unity Catalog.
Observação
- Python é a única linguagem suportada para definir ganchos de eventos. Para definir funções Python personalizadas que processam eventos em um pipeline implementado usando a interface SQL, adicione as funções personalizadas em um bloco de anotações Python separado que é executado como parte do pipeline. As funções Python são aplicadas a todo o pipeline durante a sua execução.
- Os ganchos de eventos são acionados apenas para eventos em que o maturity_level é
STABLE
. - Os ganchos de eventos são executados de forma assíncrona a partir de atualizações de pipeline, mas de forma síncrona com outros ganchos de eventos. Isso significa que apenas um único gancho de evento é executado de cada vez, e outros ganchos de evento aguardam para serem executados até que o gancho de evento em execução seja concluído. Se um gancho de evento for executado indefinidamente, ele bloqueará todos os outros ganchos de evento.
- A DLT tenta executar cada gancho de evento em cada evento emitido durante uma atualização de pipeline. Para ajudar a garantir que os ganchos de eventos atrasados tenham tempo para processar todos os eventos enfileirados, a DLT aguarda um período fixo, que não pode ser configurado, antes de encerrar a operação da computação que executa o pipeline. No entanto, não é garantido que todos os ganchos sejam acionados em todos os eventos antes que a computação seja encerrada.
Monitorizar o processamento dos ganchos de eventos
Use o tipo de evento hook_progress
no log de eventos DLT para monitorar o estado dos ganchos de eventos de uma atualização. Para evitar dependências circulares, os hooks de eventos não são acionados para eventos hook_progress
.
Definir um gancho de evento
Para definir um gancho de evento, use o on_event_hook
decorador:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
O max_allowable_consecutive_failures
descreve o número máximo de vezes consecutivas que um gancho de evento pode falhar antes de ser desativado. Uma falha de gancho de evento é definida como sempre que o gancho de evento lança uma exceção. Se um gancho de eventos estiver desabilitado, ele não processará novos eventos até que o pipeline seja reiniciado.
max_allowable_consecutive_failures
deve ser um número inteiro maior ou igual a 0
ou None
. Um valor de None
(atribuído por padrão) significa que não há limite para o número de falhas consecutivas permitidas para o gancho de eventos e o gancho de eventos nunca é desabilitado.
As falhas nos ganchos de eventos e a desativação dos ganchos de eventos podem ser monitoradas no log de eventos como eventos hook_progress
.
A função de gancho de eventos deve ser uma função Python que aceite exatamente um parâmetro, uma representação de dicionário do evento que disparou esse gancho de eventos. Qualquer valor de retorno da função de gancho de eventos é ignorado.
Exemplo: Selecionar eventos específicos para processamento
O exemplo a seguir demonstra um gancho de eventos que seleciona eventos específicos para processamento. Especificamente, este exemplo aguarda até que os eventos de pipeline STOPPING
sejam recebidos e, em seguida, envia uma mensagem nos logs do driver stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Exemplo: Enviar todos os eventos para um canal do Slack
O exemplo a seguir implementa um gancho de eventos que envia todos os eventos recebidos para um canal do Slack usando a API do Slack.
Este exemplo usa um Databricks secreto para armazenar com segurança um token necessário para autenticar na API do Slack.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Exemplo: Configurar um gatilho de evento para desativar após quatro falhas consecutivas
O exemplo a seguir demonstra como configurar um gancho de eventos que é desabilitado se ele falhar consecutivamente quatro vezes.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Exemplo: um pipeline de DLT com um gancho de eventos
O exemplo a seguir demonstra a adição de um gancho de evento ao código-fonte de um pipeline. Este é um exemplo simples, mas completo, de como usar hooks de eventos com um pipeline.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})