Condividi tramite


Elaborazione del flusso con Apache Kafka e Azure Databricks

Questo articolo descrive come usare Apache Kafka come origine o sink durante l'esecuzione di carichi di lavoro Structured Streaming in Azure Databricks.

Per altre informazioni su Kafka, vedere la documentazione di Kafka.

Leggere dati da Kafka

Di seguito è riportato un esempio di streaming letto da Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks supporta anche la semantica di lettura batch per le origini dati Kafka, come illustrato nell'esempio seguente:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Per il caricamento batch incrementale, Databricks consiglia di usare Kafka con Trigger.AvailableNow. Si veda Configurazione dell'elaborazione batch incrementale.

In Databricks Runtime 13.3 LTS e versioni successive, Azure Databricks fornisce una funzione SQL per la lettura dei dati Kafka. Lo streaming con SQL è supportato solo in DLT o con tabelle di streaming in Databricks SQL. Vedere read_kafka funzione con valori di tabella.

Configurare il lettore di streaming strutturato Kafka

Azure Databricks fornisce la parola chiave kafka come formato dati per configurare le connessioni a Kafka 0.10+.

Le seguenti sono configurazioni comuni per Kafka:

Esistono diversi modi per specificare gli argomenti da sottoscrivere. È consigliabile specificare solo uno di questi parametri:

Opzione valore Descrizione
iscriviti Elenco di argomenti delimitato da virgole. Elenco di argomenti a cui sottoscrivere.
modelloDiSottoscrizione Stringa di espressione regolare di Java. Modello utilizzato per iscriversi agli argomento/i.
assegnare Stringa JSON {"topicA":[0,1],"topic":[2,4]}. Partizioni di argomenti specifiche da utilizzare.

Altre configurazioni rilevanti:

Opzione valore Valore predefinito Descrizione
kafka.bootstrap.servers Elenco delimitato da virgole di host:port. vuoto [Necessario] Configurazione Kafka bootstrap.servers. Se non sono presenti dati da Kafka, controllare prima l'elenco di indirizzi del broker. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. Ciò è dovuto al fatto che il client Kafka presuppone che alla fine i broker diventino disponibili e, in caso di errori di rete, continui a riprovare indefinitamente.
failOnDataLoss true o false. true [Facoltativo] Se la query deve fallire quando è possibile che i dati siano andati persi. Le query possono non riuscire a leggere in modo permanente i dati da Kafka a causa di molti scenari, ad esempio argomenti eliminati, troncamento degli argomenti prima dell'elaborazione e così via. Si tenta di stimare in modo conservativo se i dati sono stati probabilmente persi o meno. A volte questo può causare falsi allarmi. Impostare questa opzione su false se non funziona come previsto o si vuole che la query continui l'elaborazione nonostante la perdita di dati.
minPartitions Valore intero >= 0, 0 = disabilitato. 0 (disabilitata) [Facoltativo] Numero minimo di partizioni da leggere da Kafka. È possibile configurare Spark per usare un minimo arbitrario di partizioni da leggere da Kafka usando l'opzione minPartitions. In genere, Spark ha una mappatura 1-1 delle partizioni dei topic di Kafka alle partizioni di Spark che consumano dati da Kafka. Se si imposta l'opzione minPartitions su un valore maggiore del numero di partizioni del topic Kafka, Spark suddividerà le grandi partizioni Kafka in parti più piccole. Questa opzione può essere impostata in momenti di picco di caricamento, sfasamento dei dati e man mano che il flusso è in ritardo per aumentare la velocità di elaborazione. Comporta un costo di inizializzazione dei consumatori Kafka a ogni attivazione, che può influire sulle prestazioni se si utilizza SSL durante la connessione a Kafka.
kafka.group.id ID del gruppo di consumatori Kafka. non impostato [Facoltativo] ID gruppo da usare durante la lettura da Kafka. Usare con cautela. Per impostazione predefinita, ogni query genera un ID di gruppo univoco per la lettura dei dati. In questo modo, ogni query ha un proprio gruppo di consumatori che non subisce interferenze da parte di nessun altro consumatore e pertanto può leggere tutte le partizioni dei topic sottoscritti. In alcuni scenari, ad esempio l'autorizzazione basata su gruppo Kafka, è possibile usare ID gruppo autorizzati specifici per leggere i dati. Facoltativamente, è possibile impostare l'ID gruppo. Tuttavia, eseguire questa operazione con estrema cautela perché può causare comportamenti imprevisti.
  • È probabile che le query in esecuzione simultanea (sia batch che streaming) con lo stesso ID gruppo interferiscano tra loro, causando la sola lettura di parte dei dati da parte di ogni query.
  • Ciò può verificarsi anche quando le query vengono avviate/riavviate in rapida successione. Per ridurre al minimo questi problemi, impostare la configurazione del consumer Kafka session.timeout.ms affinché sia molto piccola.
startingOffsets più antico, più recente più recente [Facoltativo] Il punto di partenza quando viene avviata una query, o "earliest", che indica gli offset più remoti, o una stringa JSON che specifica un offset iniziale per ciascun TopicPartition. Nel codice JSON, -2 come offset può essere usato per fare riferimento al primo termine disponibile e -1 all'ultimo termine disponibile. Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in json) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query e la ripresa continuerà sempre da dove la query è stata interrotta. Le nuove partizioni individuate durante una query inizieranno al più presto possibile.

Per altre configurazioni facoltative, vedere Guida all'integrazione di Structured Streaming Kafka.

Schema dei Kafka record

Lo schema dei record Kafka è:

Colonna Tipo
chiave binario
valore binario
argomento string
partizione int
offset lungo
timestamp lungo
tipo di timestamp int

key e value vengono sempre deserializzati come matrici di byte con ByteArrayDeserializer. Usare operazioni dataframe (ad esempio cast("string")) per deserializzare in modo esplicito le chiavi e i valori.

Scrivere dati in Kafka

Di seguito è riportato un esempio di scrittura di streaming in Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks supporta anche la semantica di scrittura batch nei sink di dati Kafka, come illustrato nell'esempio seguente:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configurare lo scrittore di streaming strutturato Kafka

Importante

Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients che abilita le scritture idempotenti per impostazione predefinita. Se un sink Kafka usa la versione 2.8.0 o successiva con ACL configurati, ma senza IDEMPOTENT_WRITE abilitato, la scrittura non riesce con il messaggio di errore org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Risolvere questo errore eseguendo l'aggiornamento a Kafka versione 2.8.0 o successiva oppure impostando .option(“kafka.enable.idempotence”, “false”) durante la configurazione del writer Structured Streaming.

Lo schema fornito a DataStreamWriter interagisce con il sink di Kafka. Puoi usare i seguenti campi:

Nome colonna Obbligatorio o facoltativo Tipo
key facoltativo STRING oppure BINARY
value Obbligatorio STRING oppure BINARY
headers facoltativo ARRAY
topic facoltativo (ignorato se topic è impostato come opzione di scrittura) STRING
partition facoltativo INT

Di seguito sono riportate le opzioni comuni configurate durante la scrittura su Kafka:

Opzione valore Valore predefinito Descrizione
kafka.boostrap.servers Elenco delimitato da virgole di <host:port> Nessuno [Necessario] Configurazione Kafka bootstrap.servers.
topic STRING non impostato [Facoltativo] Imposta l'argomento per tutte le righe da scrivere. Questa opzione esegue l'override di qualsiasi colonna di argomento presente nei dati.
includeHeaders BOOLEAN false [Opzionale] Specifica se includere le intestazioni di Kafka nella riga.

Per altre configurazioni facoltative, vedere Guida all'integrazione di Structured Streaming Kafka.

Ottenere le metriche di Kafka

È possibile ottenere la media, il minimo e il massimo del numero di offset per i quali la streaming query è in ritardo rispetto all'offset più recente disponibile tra tutti i topic sottoscritti, utilizzando le metriche avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Vedere Lettura interattiva delle metriche.

Nota

Disponibile in Databricks Runtime 9.1 e versioni successive.

Ottenere il numero totale stimato di byte che il processo di query non ha utilizzato dagli argomenti sottoscritti esaminando il valore di estimatedTotalBytesBehindLatest. Questa stima è basata sui batch elaborati negli ultimi 300 secondi. L'intervallo di tempo su cui si basa la stima può essere modificato impostando l'opzione bytesEstimateWindowLength su un valore diverso. Ad esempio, per impostarlo su 10 minuti:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se si esegue lo streaming in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard della query di streaming:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Usare SSL per connettere Azure Databricks a Kafka

Per abilitare le connessioni SSL a Kafka, seguire le istruzioni nella documentazione di Confluent Crittografia e autenticazione con SSL. È possibile specificare le configurazioni descritte, precedute da kafka., come opzioni. Ad esempio, puoi specificare il percorso del trust store nella proprietà kafka.ssl.truststore.location.

Databricks consiglia:

L'esempio seguente usa i percorsi di archiviazione degli oggetti e i segreti di Databricks per abilitare una connessione SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Connettere Kafka in HDInsight ad Azure Databricks

  1. Creare un cluster Kafka in HDInsight.

    Vedere Connettersi a Kafka in HDInsight tramite una rete virtuale di Azure per le istruzioni.

  2. Configurare i broker Kafka per pubblicizzare l'indirizzo corretto.

    Seguire le istruzioni in Configurare Kafka per la pubblicità IP. Se gestisci Kafka su macchine virtuali di Azure, assicurati che la configurazione advertised.listeners dei broker sia impostata sull'indirizzo IP interno degli host.

  3. Creare un cluster di Azure Databricks.

  4. Collegare il cluster Kafka al cluster Azure Databricks.

    Seguire le istruzioni in Reti peer virtuali.

Autenticazione del Principal del Servizio con Microsoft Entra ID e Azure Event Hubs

Azure Databricks supporta l'autenticazione dei processi Spark con i servizi di Hub eventi. Questa autenticazione viene eseguita tramite OAuth con Microsoft Entra ID.

Diagramma di autenticazione AAD

Azure Databricks supporta l'autenticazione con ID Microsoft Entra con un ID client e un segreto negli ambienti di calcolo seguenti:

  • Databricks Runtime 12.2 LTS e versioni successive sui sistemi di calcolo configurati con modalità di accesso dedicato (in precedenza modalità di accesso utente singolo).
  • Databricks Runtime 14.3 LTS e versioni successive sulla computazione configurata con la modalità di accesso standard (in precedenza modalità di accesso condiviso).
  • Pipeline DLT configurate senza Catalogo Unity.

Azure Databricks non supporta l'autenticazione con ID Entra di Microsoft con un certificato in qualsiasi ambiente di calcolo o nelle pipeline DLT configurate con Unity Catalog.

Questa autenticazione non funziona nel calcolo con modalità di accesso standard o in DLT del catalogo Unity.

Configurazione del connettore Kafka Structured Streaming

Per eseguire l'autenticazione con Microsoft Entra ID, sono necessari i valori seguenti:

  • Un ID del tenant. È possibile trovarla nella scheda Servizi Microsoft Entra ID.

  • ID cliente (noto anche come ID dell'applicazione).

  • Un segreto del cliente. Una volta ottenuto questo, è necessario aggiungerlo come segreto all'area di lavoro di Databricks. Per aggiungere questo segreto, vedere Gestione dei segreti.

  • Argomento di EventHubs. È possibile trovare un elenco di argomenti nella sezione Hub degli Eventi sotto la sezione Entità su una pagina specifica dello spazio dei nomi di Event Hubs. Per lavorare con più argomenti, è possibile impostare il ruolo IAM a livello di Event Hubs.

  • Un server EventHubs. È possibile trovarla nella pagina di panoramica del namespace di Event Hubs specifico:

    Namespace di Event Hubs

Inoltre, per usare Entra ID, è necessario indicare a Kafka di usare il meccanismo SASL OAuth (SASL è un protocollo generico e OAuth è un tipo di "meccanismo" SASL):

  • kafka.security.protocol deve essere SASL_SSL
  • kafka.sasl.mechanism deve essere OAUTHBEARER
  • kafka.sasl.login.callback.handler.class dovrebbe essere un nome completamente qualificato della classe Java con un valore di kafkashaded per il gestore di callback di login della nostra classe Kafka shaded. Vedere l'esempio seguente per la classe esatta.

Esempio

Si esaminerà quindi un esempio in esecuzione:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Gestione degli errori potenziali

  • Le opzioni di streaming non sono supportate.

    Se si tenta di usare questo meccanismo di autenticazione in una pipeline DLT configurata con il catalogo Unity, è possibile che venga visualizzato l'errore seguente:

    Errore di streaming non supportato

    Per risolvere questo errore, usare una configurazione di calcolo supportata. Vedere Autenticazione entità servizio con Microsoft Entra ID e Azure Event Hub.

  • Non è stato possibile creare un nuovo KafkaAdminClient.

    Si tratta di un errore interno che Kafka genera se una delle opzioni di autenticazione seguenti non è corretta:

    • ID client (detto anche ID applicazione)
    • ID del locatario
    • Server di EventHubs

    Per risolvere l'errore, verificare che i valori siano corretti per queste opzioni.

    Inoltre, è possibile che venga visualizzato questo errore se si modificano le opzioni di configurazione fornite per impostazione predefinita nell'esempio (a cui è stato chiesto di non modificare), ad esempio kafka.security.protocol.

  • Non vengono restituiti record

    Se si sta tentando di visualizzare o elaborare il dataframe ma non si ottengono risultati, nell'interfaccia utente verrà visualizzato quanto segue.

    Nessun messaggio dei risultati

    Questo messaggio indica che l'autenticazione ha avuto esito positivo, ma EventHubs non ha restituito dati. Alcuni possibili motivi (anche se non esaustivi) sono:

    • È stato specificato l'argomento EventHubs errato.
    • L'opzione di configurazione Kafka predefinita per startingOffsets è latest e attualmente non si ricevono dati tramite l'argomento. È possibile impostare startingOffsetstoearliest per iniziare a leggere i dati a partire dai primi offset di Kafka.