Elaborazione del flusso con Apache Kafka e Azure Databricks
Questo articolo descrive come usare Apache Kafka come origine o sink durante l'esecuzione di carichi di lavoro Structured Streaming in Azure Databricks.
Per altre informazioni su Kafka, vedere la documentazione di Kafka.
Leggere dati da Kafka
Di seguito è riportato un esempio di streaming letto da Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks supporta anche la semantica di lettura batch per le origini dati Kafka, come illustrato nell'esempio seguente:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Per il caricamento batch incrementale, Databricks consiglia di usare Kafka con Trigger.AvailableNow
. Si veda Configurazione dell'elaborazione batch incrementale.
In Databricks Runtime 13.3 LTS e versioni successive, Azure Databricks fornisce una funzione SQL per la lettura dei dati Kafka. Lo streaming con SQL è supportato solo in DLT o con tabelle di streaming in Databricks SQL. Vedere read_kafka
funzione con valori di tabella.
Configurare il lettore di streaming strutturato Kafka
Azure Databricks fornisce la parola chiave kafka
come formato dati per configurare le connessioni a Kafka 0.10+.
Le seguenti sono configurazioni comuni per Kafka:
Esistono diversi modi per specificare gli argomenti da sottoscrivere. È consigliabile specificare solo uno di questi parametri:
Opzione | valore | Descrizione |
---|---|---|
iscriviti | Elenco di argomenti delimitato da virgole. | Elenco di argomenti a cui sottoscrivere. |
modelloDiSottoscrizione | Stringa di espressione regolare di Java. | Modello utilizzato per iscriversi agli argomento/i. |
assegnare | Stringa JSON {"topicA":[0,1],"topic":[2,4]} . |
Partizioni di argomenti specifiche da utilizzare. |
Altre configurazioni rilevanti:
Opzione | valore | Valore predefinito | Descrizione |
---|---|---|---|
kafka.bootstrap.servers | Elenco delimitato da virgole di host:port. | vuoto | [Necessario] Configurazione Kafka bootstrap.servers . Se non sono presenti dati da Kafka, controllare prima l'elenco di indirizzi del broker. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. Ciò è dovuto al fatto che il client Kafka presuppone che alla fine i broker diventino disponibili e, in caso di errori di rete, continui a riprovare indefinitamente. |
failOnDataLoss |
true o false . |
true |
[Facoltativo] Se la query deve fallire quando è possibile che i dati siano andati persi. Le query possono non riuscire a leggere in modo permanente i dati da Kafka a causa di molti scenari, ad esempio argomenti eliminati, troncamento degli argomenti prima dell'elaborazione e così via. Si tenta di stimare in modo conservativo se i dati sono stati probabilmente persi o meno. A volte questo può causare falsi allarmi. Impostare questa opzione su false se non funziona come previsto o si vuole che la query continui l'elaborazione nonostante la perdita di dati. |
minPartitions | Valore intero >= 0, 0 = disabilitato. | 0 (disabilitata) | [Facoltativo] Numero minimo di partizioni da leggere da Kafka. È possibile configurare Spark per usare un minimo arbitrario di partizioni da leggere da Kafka usando l'opzione minPartitions . In genere, Spark ha una mappatura 1-1 delle partizioni dei topic di Kafka alle partizioni di Spark che consumano dati da Kafka. Se si imposta l'opzione minPartitions su un valore maggiore del numero di partizioni del topic Kafka, Spark suddividerà le grandi partizioni Kafka in parti più piccole. Questa opzione può essere impostata in momenti di picco di caricamento, sfasamento dei dati e man mano che il flusso è in ritardo per aumentare la velocità di elaborazione. Comporta un costo di inizializzazione dei consumatori Kafka a ogni attivazione, che può influire sulle prestazioni se si utilizza SSL durante la connessione a Kafka. |
kafka.group.id | ID del gruppo di consumatori Kafka. | non impostato | [Facoltativo] ID gruppo da usare durante la lettura da Kafka. Usare con cautela. Per impostazione predefinita, ogni query genera un ID di gruppo univoco per la lettura dei dati. In questo modo, ogni query ha un proprio gruppo di consumatori che non subisce interferenze da parte di nessun altro consumatore e pertanto può leggere tutte le partizioni dei topic sottoscritti. In alcuni scenari, ad esempio l'autorizzazione basata su gruppo Kafka, è possibile usare ID gruppo autorizzati specifici per leggere i dati. Facoltativamente, è possibile impostare l'ID gruppo. Tuttavia, eseguire questa operazione con estrema cautela perché può causare comportamenti imprevisti.
|
startingOffsets | più antico, più recente | più recente | [Facoltativo] Il punto di partenza quando viene avviata una query, o "earliest", che indica gli offset più remoti, o una stringa JSON che specifica un offset iniziale per ciascun TopicPartition. Nel codice JSON, -2 come offset può essere usato per fare riferimento al primo termine disponibile e -1 all'ultimo termine disponibile. Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in json) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query e la ripresa continuerà sempre da dove la query è stata interrotta. Le nuove partizioni individuate durante una query inizieranno al più presto possibile. |
Per altre configurazioni facoltative, vedere Guida all'integrazione di Structured Streaming Kafka.
Schema dei Kafka record
Lo schema dei record Kafka è:
Colonna | Tipo |
---|---|
chiave | binario |
valore | binario |
argomento | string |
partizione | int |
offset | lungo |
timestamp | lungo |
tipo di timestamp | int |
key
e value
vengono sempre deserializzati come matrici di byte con ByteArrayDeserializer
. Usare operazioni dataframe (ad esempio cast("string")
) per deserializzare in modo esplicito le chiavi e i valori.
Scrivere dati in Kafka
Di seguito è riportato un esempio di scrittura di streaming in Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks supporta anche la semantica di scrittura batch nei sink di dati Kafka, come illustrato nell'esempio seguente:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Configurare lo scrittore di streaming strutturato Kafka
Importante
Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients
che abilita le scritture idempotenti per impostazione predefinita. Se un sink Kafka usa la versione 2.8.0 o successiva con ACL configurati, ma senza IDEMPOTENT_WRITE
abilitato, la scrittura non riesce con il messaggio di errore org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Risolvere questo errore eseguendo l'aggiornamento a Kafka versione 2.8.0 o successiva oppure impostando .option(“kafka.enable.idempotence”, “false”)
durante la configurazione del writer Structured Streaming.
Lo schema fornito a DataStreamWriter interagisce con il sink di Kafka. Puoi usare i seguenti campi:
Nome colonna | Obbligatorio o facoltativo | Tipo |
---|---|---|
key |
facoltativo |
STRING oppure BINARY |
value |
Obbligatorio |
STRING oppure BINARY |
headers |
facoltativo | ARRAY |
topic |
facoltativo (ignorato se topic è impostato come opzione di scrittura) |
STRING |
partition |
facoltativo | INT |
Di seguito sono riportate le opzioni comuni configurate durante la scrittura su Kafka:
Opzione | valore | Valore predefinito | Descrizione |
---|---|---|---|
kafka.boostrap.servers |
Elenco delimitato da virgole di <host:port> |
Nessuno | [Necessario] Configurazione Kafka bootstrap.servers . |
topic |
STRING |
non impostato | [Facoltativo] Imposta l'argomento per tutte le righe da scrivere. Questa opzione esegue l'override di qualsiasi colonna di argomento presente nei dati. |
includeHeaders |
BOOLEAN |
false |
[Opzionale] Specifica se includere le intestazioni di Kafka nella riga. |
Per altre configurazioni facoltative, vedere Guida all'integrazione di Structured Streaming Kafka.
Ottenere le metriche di Kafka
È possibile ottenere la media, il minimo e il massimo del numero di offset per i quali la streaming query è in ritardo rispetto all'offset più recente disponibile tra tutti i topic sottoscritti, utilizzando le metriche avgOffsetsBehindLatest
, maxOffsetsBehindLatest
e minOffsetsBehindLatest
. Vedere Lettura interattiva delle metriche.
Nota
Disponibile in Databricks Runtime 9.1 e versioni successive.
Ottenere il numero totale stimato di byte che il processo di query non ha utilizzato dagli argomenti sottoscritti esaminando il valore di estimatedTotalBytesBehindLatest
. Questa stima è basata sui batch elaborati negli ultimi 300 secondi. L'intervallo di tempo su cui si basa la stima può essere modificato impostando l'opzione bytesEstimateWindowLength
su un valore diverso. Ad esempio, per impostarlo su 10 minuti:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Se si esegue lo streaming in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard della query di streaming:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Usare SSL per connettere Azure Databricks a Kafka
Per abilitare le connessioni SSL a Kafka, seguire le istruzioni nella documentazione di Confluent Crittografia e autenticazione con SSL. È possibile specificare le configurazioni descritte, precedute da kafka.
, come opzioni. Ad esempio, puoi specificare il percorso del trust store nella proprietà kafka.ssl.truststore.location
.
Databricks consiglia:
- Archiviare i certificati nell'archiviazione di oggetti cloud. È possibile limitare l'accesso ai certificati solo ai cluster che possono accedere a Kafka. Vedere Governance dei dati con Unity Catalog.
- Archiviare le password del certificato come segreti in un ambito segreto.
L'esempio seguente usa i percorsi di archiviazione degli oggetti e i segreti di Databricks per abilitare una connessione SSL:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Connettere Kafka in HDInsight ad Azure Databricks
Creare un cluster Kafka in HDInsight.
Vedere Connettersi a Kafka in HDInsight tramite una rete virtuale di Azure per le istruzioni.
Configurare i broker Kafka per pubblicizzare l'indirizzo corretto.
Seguire le istruzioni in Configurare Kafka per la pubblicità IP. Se gestisci Kafka su macchine virtuali di Azure, assicurati che la configurazione
advertised.listeners
dei broker sia impostata sull'indirizzo IP interno degli host.Creare un cluster di Azure Databricks.
Collegare il cluster Kafka al cluster Azure Databricks.
Seguire le istruzioni in Reti peer virtuali.
Autenticazione del Principal del Servizio con Microsoft Entra ID e Azure Event Hubs
Azure Databricks supporta l'autenticazione dei processi Spark con i servizi di Hub eventi. Questa autenticazione viene eseguita tramite OAuth con Microsoft Entra ID.
Azure Databricks supporta l'autenticazione con ID Microsoft Entra con un ID client e un segreto negli ambienti di calcolo seguenti:
- Databricks Runtime 12.2 LTS e versioni successive sui sistemi di calcolo configurati con modalità di accesso dedicato (in precedenza modalità di accesso utente singolo).
- Databricks Runtime 14.3 LTS e versioni successive sulla computazione configurata con la modalità di accesso standard (in precedenza modalità di accesso condiviso).
- Pipeline DLT configurate senza Catalogo Unity.
Azure Databricks non supporta l'autenticazione con ID Entra di Microsoft con un certificato in qualsiasi ambiente di calcolo o nelle pipeline DLT configurate con Unity Catalog.
Questa autenticazione non funziona nel calcolo con modalità di accesso standard o in DLT del catalogo Unity.
Configurazione del connettore Kafka Structured Streaming
Per eseguire l'autenticazione con Microsoft Entra ID, sono necessari i valori seguenti:
Un ID del tenant. È possibile trovarla nella scheda Servizi Microsoft Entra ID.
ID cliente (noto anche come ID dell'applicazione).
Un segreto del cliente. Una volta ottenuto questo, è necessario aggiungerlo come segreto all'area di lavoro di Databricks. Per aggiungere questo segreto, vedere Gestione dei segreti.
Argomento di EventHubs. È possibile trovare un elenco di argomenti nella sezione Hub degli Eventi sotto la sezione Entità su una pagina specifica dello spazio dei nomi di Event Hubs. Per lavorare con più argomenti, è possibile impostare il ruolo IAM a livello di Event Hubs.
Un server EventHubs. È possibile trovarla nella pagina di panoramica del namespace di Event Hubs specifico:
Inoltre, per usare Entra ID, è necessario indicare a Kafka di usare il meccanismo SASL OAuth (SASL è un protocollo generico e OAuth è un tipo di "meccanismo" SASL):
-
kafka.security.protocol
deve essereSASL_SSL
-
kafka.sasl.mechanism
deve essereOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
dovrebbe essere un nome completamente qualificato della classe Java con un valore dikafkashaded
per il gestore di callback di login della nostra classe Kafka shaded. Vedere l'esempio seguente per la classe esatta.
Esempio
Si esaminerà quindi un esempio in esecuzione:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Gestione degli errori potenziali
Le opzioni di streaming non sono supportate.
Se si tenta di usare questo meccanismo di autenticazione in una pipeline DLT configurata con il catalogo Unity, è possibile che venga visualizzato l'errore seguente:
Per risolvere questo errore, usare una configurazione di calcolo supportata. Vedere Autenticazione entità servizio con Microsoft Entra ID e Azure Event Hub.
Non è stato possibile creare un nuovo
KafkaAdminClient
.Si tratta di un errore interno che Kafka genera se una delle opzioni di autenticazione seguenti non è corretta:
- ID client (detto anche ID applicazione)
- ID del locatario
- Server di EventHubs
Per risolvere l'errore, verificare che i valori siano corretti per queste opzioni.
Inoltre, è possibile che venga visualizzato questo errore se si modificano le opzioni di configurazione fornite per impostazione predefinita nell'esempio (a cui è stato chiesto di non modificare), ad esempio
kafka.security.protocol
.Non vengono restituiti record
Se si sta tentando di visualizzare o elaborare il dataframe ma non si ottengono risultati, nell'interfaccia utente verrà visualizzato quanto segue.
Questo messaggio indica che l'autenticazione ha avuto esito positivo, ma EventHubs non ha restituito dati. Alcuni possibili motivi (anche se non esaustivi) sono:
- È stato specificato l'argomento EventHubs errato.
- L'opzione di configurazione Kafka predefinita per
startingOffsets
èlatest
e attualmente non si ricevono dati tramite l'argomento. È possibile impostarestartingOffsetstoearliest
per iniziare a leggere i dati a partire dai primi offset di Kafka.