Dataströmbearbetning med Apache Kafka och Azure Databricks
I den här artikeln beskrivs hur du kan använda Apache Kafka som källa eller mottagare när du kör arbetsbelastningar för strukturerad direktuppspelning på Azure Databricks.
Mer information om Kafka finns i Kafka-dokumentationen.
Läs data från Kafka
Följande är ett exempel på en strömningsläsning från Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks stöder även batchläsningssemantik för Kafka-datakällor, som du ser i följande exempel:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
För inkrementell batchbearbetning rekommenderar Databricks att använda Kafka med Trigger.AvailableNow
. Se Konfigurera inkrementell batchbearbetning.
I Databricks Runtime 13.3 LTS och senare tillhandahåller Azure Databricks en SQL-funktion för att läsa Kafka-data. Direktuppspelning med SQL stöds endast i DLT eller med strömmande tabeller i Databricks SQL. Se read_kafka
tabellvärdesfunktion.
Konfigurera Kafka Structured Streaming-läsare
Azure Databricks tillhandahåller nyckelordet kafka
som ett dataformat för att konfigurera anslutningar till Kafka 0.10+.
Följande är de vanligaste konfigurationerna för Kafka:
Det finns flera sätt att ange vilka ämnen som ska prenumerera på. Du bör bara ange en av dessa parametrar:
Alternativ | Värde | beskrivning |
---|---|---|
prenumerera | En kommaavgränsad lista med ämnen. | Ämneslistan att prenumerera på. |
prenumerationsmönster | Java regex-sträng. | Det mönster som används för att prenumerera på ämnen. |
tilldela | JSON-sträng {"topicA":[0,1],"topic":[2,4]} . |
Specifika topicpartitioner att konsumera. |
Andra viktiga konfigurationer:
Alternativ | Värde | Standardvärde | beskrivning |
---|---|---|---|
kafka.bootstrap.servers | Kommaavgränsad lista över värd:port. | empty | [Krävs] Kafka-konfigurationen bootstrap.servers . Om du upptäcker att det inte finns några data från Kafka kontrollerar du listan med koordinatoradresser först. Om koordinatoradresslistan är felaktig kanske det inte finns några fel. Detta beror på att Kafka-klienten förutsätter att mäklarna blir tillgängliga så småningom och vid nätverksfel försökas det om och om igen. |
misslyckas vid dataförlust |
true eller false . |
true |
[Valfritt] Om frågan ska misslyckas när det är möjligt att data har gått förlorade. Frågor kan permanent misslyckas med att läsa data från Kafka på grund av många scenarier, till exempel borttagna ämnen, ämnestrunkering före bearbetning och så vidare. Vi försöker uppskatta om data eventuellt har gått förlorade eller inte. Ibland kan detta orsaka falska larm. Ställ in det här alternativet på false om det inte fungerar som förväntat, eller om du vill att frågan ska fortsätta bearbetas trots dataförlust. |
minPartitions | Heltal >= 0, 0 = avaktiverad. | 0 (inaktiverad) | [Valfritt] Minsta antal partitioner som ska läsas från Kafka. Du kan konfigurera Spark att använda ett godtyckligt minimum av partitioner för att läsa från Kafka med hjälp minPartitions av alternativet . Normalt har Spark en en-till-en-mappning av Kafka topicPartitions till Spark-partitioner som konsumerar från Kafka. Om du ställer in alternativet minPartitions till ett värde som är större än dina Kafka-ämnespartitioner kommer Spark att dela upp stora Kafka-partitioner i mindre delar. Det här alternativet kan ställas in vid tider med hög belastning, datasnedvridning och när dataströmmen halkar efter för att öka bearbetningshastigheten. Det kostar att initiera Kafka-konsumenter vid varje utlösare, vilket kan påverka prestanda om du använder SSL när du ansluter till Kafka. |
kafka.group.id | Ett Kafka-konsumentgrupps-ID. | inte inställt | [Valfritt] Grupp-ID som ska användas vid läsning från Kafka. Använd detta med försiktighet. Som standard genererar varje fråga ett unikt grupp-ID för att läsa data. Detta säkerställer att varje fråga har en egen konsumentgrupp som inte utsätts för interferens från någon annan konsument och därför kan läsa alla partitioner i sina prenumerationsavsnitt. I vissa scenarier (till exempel Kafka-gruppbaserad auktorisering) kanske du vill använda specifika auktoriserade grupp-ID:er för att läsa data. Du kan också ange grupp-ID:t. Men gör detta med extrem försiktighet eftersom det kan orsaka oväntat beteende.
|
startingOffsets | tidigaste , senaste | senaste | [Valfritt] Startpunkten när en fråga startas, antingen "tidigast" som är från de tidigaste förskjutningarna, eller en json-sträng som anger en startförskjutning för varje TopicPartition. I JSON kan -2 som en förskjutning användas för att referera till den tidigaste, -1 till den senaste. Obs! För batchfrågor är det inte tillåtet att använda 'senaste' (varken implicit eller genom att använda -1 i JSON). För direktuppspelningsfrågor gäller detta bara när en ny fråga startas, och det återupptas alltid där frågan slutade. Nyligen upptäckta partitioner under en sökning börjar vid tidigast möjliga tidpunkt. |
Se Integrationsguide för strukturerad streaming av Kafka för andra valfria konfigurationer.
Schema för Kafka-dataset
Schemat för Kafka-posterna är:
Kolumn | Typ |
---|---|
nyckel | binär |
värde | binär |
Ämne | sträng |
skifte | heltal |
förskjuta | lång |
tidsstämpel | lång |
timestampTyp | heltal |
key
och value
deserialiseras alltid som byte-arrayer med ByteArrayDeserializer
. Använd DataFrame-åtgärder (till exempel cast("string")
) för att explicit deserialisera nycklar och värden.
Skriva data till Kafka
Följande är ett exempel på en strömmande skrivning till Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks stöder även semantik för batchskrivning till Kafka-datamottagare, som visas i följande exempel:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Konfigurera Kafka Structured Streaming-skrivaren
Viktigt!
Databricks Runtime 13.3 LTS och senare innehåller en nyare version av kafka-clients
biblioteket som aktiverar idempotent-skrivningar som standard. Om en Kafka-mottagare använder version 2.8.0 eller senare med ACL:er konfigurerade, men utan IDEMPOTENT_WRITE
aktiverad, misslyckas skrivning med felmeddelandet org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Lös det här felet genom att uppgradera till Kafka version 2.8.0 eller senare, eller genom att ange .option(“kafka.enable.idempotence”, “false”)
när du konfigurerar din structured streaming-skrivare.
Det schema som tillhandahålls till DataStreamWriter interagerar med Kafka-sink. Du kan använda följande fält:
Kolumnnamn | Obligatorisk eller valfri | Typ |
---|---|---|
key |
valfri |
STRING eller BINARY |
value |
nödvändig |
STRING eller BINARY |
headers |
valfri | ARRAY |
topic |
valfritt (ignoreras om topic anges som skrivalternativ) |
STRING |
partition |
valfri | INT |
Följande är vanliga alternativ som anges när du skriver till Kafka:
Alternativ | Värde | Standardvärde | beskrivning |
---|---|---|---|
kafka.boostrap.servers |
En kommaavgränsad lista över <host:port> |
inget | [Krävs] Kafka-konfigurationen bootstrap.servers . |
topic |
STRING |
inte inställt | [Valfritt] Anger ämnet för alla rader som ska skrivas. Det här alternativet åsidosätter alla ämneskolumner som finns i data. |
includeHeaders |
BOOLEAN |
false |
[Valfritt] Om Kafka-rubrikerna ska inkluderas på raden. |
Se Guide för strukturerad strömmande Kafka-integration för andra valfria konfigurationer.
Hämta Kafka-mått
Du kan få medelvärdet, min och max för det antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumererade ämnen med måtten avgOffsetsBehindLatest
, maxOffsetsBehindLatest
och minOffsetsBehindLatest
. Se Läsa mått interaktivt.
Kommentar
Tillgänglig i Databricks Runtime 9.1 och senare.
Hämta det uppskattade totala antalet byte som frågeprocessen inte har förbrukat från de prenumererade ämnena genom att undersöka värdet för estimatedTotalBytesBehindLatest
. Den här uppskattningen baseras på de batchar som bearbetats under de senaste 300 sekunderna. Den tidsram som uppskattningen baseras på kan ändras genom att alternativet bytesEstimateWindowLength
anges till ett annat värde. Om du till exempel vill ange den till 10 minuter:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Om du kör dataströmmen i en notebook-fil kan du se dessa mått under fliken Rådata på instrumentpanelen för strömningsfrågans förlopp:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Använda SSL för att ansluta Azure Databricks till Kafka
Om du vill aktivera SSL-anslutningar till Kafka följer du anvisningarna i Confluent-dokumentationen Kryptering och autentisering med SSL. Du kan ange de konfigurationer som beskrivs där, prefixet med kafka.
, som alternativ. Du kan till exempel ange platsen för certifikatförrådet i egenskapen kafka.ssl.truststore.location
.
Databricks rekommenderar att du:
- Lagra dina certifikat i molnobjektlagring. Du kan endast begränsa åtkomsten till certifikaten till kluster som har åtkomst till Kafka. Se Datastyrning med Unity Catalog.
- Lagra dina certifikatlösenord som hemligheter i ett hemligt omfång.
I följande exempel används objektlagringsplatser och Databricks-hemligheter för att aktivera en SSL-anslutning:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Ansluta Kafka på HDInsight till Azure Databricks
Skapa ett HDInsight Kafka-kluster.
Mer information finns i Ansluta till Kafka i HDInsight via ett virtuellt Azure-nätverk .
Konfigurera Kafka-koordinatorerna så att de annonserar rätt adress.
Följ anvisningarna i Konfigurera Kafka för IP-reklam. Om du hanterar Kafka själv på Azure Virtual Machines, kontrollera att
advertised.listeners
-konfigurationen för brokerarna är inställd på de interna IP-adresserna hos värdarna.Skapa ett Azure Databricks-kluster.
Anslut Kafka-klustret till Azure Databricks-klustret.
Följ anvisningarna i peer-virtuella nätverk.
Autentisering av tjänstens huvudkonto med Microsoft Entra-ID och Azure Event Hubs
Azure Databricks stöder autentisering av Spark-jobb med Event Hubs-tjänster. Den här autentiseringen görs via OAuth med Microsoft Entra-ID.
Azure Databricks stöder Microsoft Entra-ID-autentisering med ett klient-ID och en hemlighet i följande beräkningsmiljöer:
- Databricks Runtime 12.2 LTS och senare för beräkningsresurser konfigurerade med dedikerat åtkomstläge (tidigare åtkomstläge för en enskild användare).
- Databricks Runtime 14.3 LTS och senare på beräkningsresurser konfigurerade i standardåtkomstläge (tidigare delat åtkomstläge).
- DLT-pipelines som konfigurerats utan Unity Catalog.
Azure Databricks stöder inte Microsoft Entra-ID-autentisering med ett certifikat i någon beräkningsmiljö eller i DLT-pipelines som konfigurerats med Unity Catalog.
Den här autentiseringen fungerar inte med beräkning med standardåtkomstläge eller på Unity Catalog DLT.
Konfigurera Kafka-anslutningsappen för strukturerad direktuppspelning
För att kunna utföra autentisering med Microsoft Entra-ID behöver du följande värden:
Ett hyrtagar-ID. Du hittar detta på fliken Microsoft Entra ID-tjänster .
Ett clientID (även kallat program-ID).
En klienthemlighet. När du har det här bör du lägga till det som en hemlighet i din Databricks-arbetsyta. Information om hur du lägger till den här hemligheten finns i Hemlighetshantering.
Ett EventHubs-ämne. Du hittar en lista med ämnen i avsnittet Event Hubs under avsnittet Entiteter på en specifik sida för Event Hubs-namnområde. Om du vill arbeta med flera ämnen kan du ange IAM-rollen på Event Hubs-nivå.
En EventHub-server. Du hittar detta på översiktssidan för ditt specifika Event Hubs-namnområde:
För att kunna använda Entra-ID måste vi dessutom be Kafka att använda OAuth SASL-mekanismen (SASL är ett generiskt protokoll och OAuth är en typ av SASL-mekanism):
-
kafka.security.protocol
bör varaSASL_SSL
-
kafka.sasl.mechanism
bör varaOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
bör vara ett fullständigt kvalificerat namn på Java-klassen med värdetkafkashaded
för inloggningsåteranropshanteraren i vår modifierade Kafka-klass. Se följande exempel för den exakta klassen.
Exempel
Låt oss titta på ett praktiskt exempel:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Hantera potentiella fel
Strömningsalternativ stöds inte.
Om du försöker använda den här autentiseringsmekanismen i en DLT-pipeline som konfigurerats med Unity Catalog kan du få följande fel:
Lös det här felet genom att använda en beräkningskonfiguration som stöds. Läs mer om tjänstehuvudnamn autentisering med Microsoft Entra ID och Azure Event Hubs.
Det gick inte att skapa en ny
KafkaAdminClient
.Det här är ett internt fel som Kafka genererar om något av följande autentiseringsalternativ är felaktigt:
- Klient-ID (även kallat program-ID)
- Hyresgäst-ID
- EventHubs-server
För att lösa felet kontrollerar du att värdena är korrekta för de här alternativen.
Dessutom kan det här felet visas om du ändrar konfigurationsalternativen som anges som standard i exemplet (som du uppmanas att inte ändra), till exempel
kafka.security.protocol
.Det finns inga poster som returneras
Om du försöker visa eller bearbeta din DataFrame men inte får resultat visas följande i användargränssnittet.
Det här meddelandet innebär att autentiseringen lyckades, men EventHubs returnerade inga data. Några möjliga (men inte på något sätt uttömmande) skäl är:
- Du hade fel i att ange EventHubs-ämne.
- Standardalternativet för Kafka-konfigurationen för
startingOffsets
ärlatest
, och du tar inte emot några data via topicen ännu. Du kan angestartingOffsetstoearliest
för att börja läsa data från Kafkas tidigaste offsets.