Przetwarzanie strumieniowe przy użyciu platformy Apache Kafka i usługi Azure Databricks
W tym artykule opisano, jak można używać platformy Apache Kafka jako źródła lub ujścia podczas uruchamiania obciążeń przesyłania strumieniowego ze strukturą w usłudze Azure Databricks.
Aby uzyskać więcej informacji na temat platformy Kafka, zobacz dokumentację platformy Kafka.
Odczytywanie danych z platformy Kafka
Poniżej przedstawiono przykład odczytu strumieniowego z Kafki:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Usługa Azure Databricks obsługuje również semantykę odczytu wsadowego dla źródeł danych Kafka, jak pokazano w poniższym przykładzie.
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
W przypadku ładowania przyrostowego wsadowego usługa Databricks zaleca używanie platformy Kafka z usługą Trigger.AvailableNow
. Zobacz Konfigurowanie przyrostowego przetwarzania wsadowego.
W środowisku Databricks Runtime 13.3 LTS i nowszym usługa Azure Databricks udostępnia funkcję SQL do odczytywania danych platformy Kafka. Przesyłanie strumieniowe przy użyciu języka SQL jest obsługiwane tylko w technologii DLT lub w tabelach przesyłania strumieniowego w usłudze Databricks SQL. Zobacz read_kafka
funkcji wartości tabeli.
Konfiguracja czytnika Kafka Structured Streaming
Usługa Azure Databricks udostępnia słowo kluczowe kafka
jako format danych do konfigurowania połączeń z platformą Kafka 0.10 lub nowszym.
Poniżej przedstawiono najbardziej typowe konfiguracje platformy Kafka:
Istnieje wiele sposobów określania tematów do zasubskrybowania. Należy podać tylko jeden z następujących parametrów:
Opcja | Wartość | Opis |
---|---|---|
subskrybowanie | Rozdzielona przecinkami lista tematów. | Lista tematów do subskrybowania. |
wzorzec subskrypcji | Ciąg wyrażenia regularnego języka Java. | Wzorzec używany do subskrybowania tematów. |
przydzielić | Ciąg JSON {"topicA":[0,1],"topic":[2,4]} . |
Określone tematyPartition do korzystania. |
Inne istotne konfiguracje:
Opcja | Wartość | Wartość domyślna | Opis |
---|---|---|---|
kafka.bootstrap.servers | Rozdzielona przecinkami lista host:port. | empty | [Wymagane] Plik konfiguracyjny platformy Kafka bootstrap.servers . Jeśli okaże się, że nie ma danych z platformy Kafka, najpierw sprawdź listę adresów brokera. Jeśli lista adresów brokera jest niepoprawna, może nie występować żadne błędy. Dzieje się tak, ponieważ klient Kafka zakłada, że brokerzy ostatecznie znów będą dostępni i w przypadku błędów sieci będzie próbować ponawiać w nieskończoność. |
failOnDataLoss |
true lub false . |
true |
[Opcjonalnie] Czy zapytanie nie powiodło się, jeśli jest możliwe, że dane zostały utracone. Zapytania mogą na stałe nie odczytywać danych z platformy Kafka z różnych powodów, takich jak usunięte wątki, przycięcie wątku przed jego przetworzeniem itp. Staramy się oszacować konserwatywnie, czy dane mogły być utracone, czy nie. Czasami może to spowodować fałszywe alarmy. Ustaw tę opcję na false , jeśli nie działa zgodnie z oczekiwaniami, lub chcesz, aby zapytanie kontynuowało przetwarzanie pomimo utraty danych. |
minimalna liczba partycji | Liczba całkowita > = 0, 0 oznacza wyłączenie. | 0 (wyłączone) | [Opcjonalnie] Minimalna liczba partycji do odczytu z platformy Kafka. Można skonfigurować Spark, aby używał dowolnej minimalnej liczby partycji do odczytu z platformy Kafka przy użyciu opcji minPartitions . Zwykle Spark ma mapowanie 1–1 partycji tematycznych Kafki na partycje Spark, które konsumują dane z Kafki. Jeśli ustawisz opcję minPartitions na wartość większą niż partycje tematu Kafka, platforma Spark podzieli duże partycje Kafka na mniejsze części. Tę opcję można ustawić w okresach szczytowych obciążeń, przy nierównomierności danych oraz gdy zaobserwujesz opóźnienia w przetwarzaniu strumienia, aby zwiększyć szybkość przetwarzania. Wiąże się to z kosztem inicjowania użytkowników platformy Kafka w każdym wyzwalaczu, co może mieć wpływ na wydajność w przypadku używania protokołu SSL podczas nawiązywania połączenia z platformą Kafka. |
kafka.group.id | Identyfikator grupy odbiorców platformy Kafka. | nie ustawiono | [Opcjonalnie] Identyfikator grupy do użycia podczas odczytywania z platformy Kafka. Użyj tego z ostrożnością. Domyślnie każde zapytanie generuje unikatowy identyfikator grupy do odczytywania danych. Gwarantuje to, że każde zapytanie ma własną grupę odbiorców, która nie ma wpływu na żadnego innego użytkownika, i dlatego może odczytywać wszystkie partycje subskrybowanych tematów. W niektórych scenariuszach (na przykład autoryzacja oparta na grupach platformy Kafka) możesz użyć określonych autoryzowanych identyfikatorów grup do odczytywania danych. Opcjonalnie można ustawić identyfikator grupy. Jednak zrób to z skrajną ostrożnością, ponieważ może to spowodować nieoczekiwane zachowanie.
|
startOffsets | najwcześniejsza , najnowsza | najnowszy | [Opcjonalnie] Punkt początkowy, gdy zapytanie jest uruchamiane, to "najwcześniejsze", co oznacza początek od najwcześniejszych przesunięć, lub ciąg JSON określający przesunięcie początkowe dla każdej TopicPartition. W pliku JSON -2 jako przesunięcie może służyć do odwoływania się do najwcześniejszego wpisu, a -1 do najnowszego wpisu. Uwaga: w przypadku zapytań wsadowych użycie najnowszych danych (niejawnie lub przy użyciu -1 w formacie json) nie jest dozwolone. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie, a wznowienie zawsze będzie pobierane z miejsca, w którym zapytanie zostało przerwane. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. |
Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji Kafka w ramach uporządkowanego przesyłania strumieniowego.
Schemat rekordów platformy Kafka
Schemat rekordów platformy Kafka to:
Kolumna | Rodzaj |
---|---|
klucz | binarny |
wartość | binarny |
temat | string |
partycja | int |
Przesunięcie | długi |
znacznik czasu | długi |
timestampType | int |
Obiekty key
i value
są zawsze deserializowane jako tablice bajtów przy użyciu ByteArrayDeserializer
. Użyj operacji ramki danych (takich jak cast("string")
), aby jawnie deserializować klucze i wartości.
Zapisywanie danych na platformie Kafka
Poniżej przedstawiono przykład przesyłania strumieniowego zapisu na platformie Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Usługa Azure Databricks obsługuje również semantykę zapisu wsadowego do danych wyjściowych w Kafka, jak pokazano w poniższym przykładzie.
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Konfigurowanie składnika zapisu przesyłania strumieniowego Apache Kafka
Ważne
Środowisko Databricks Runtime 13.3 LTS i nowsze zawiera bardziej aktualną wersję biblioteki kafka-clients
, która domyślnie umożliwia idempotentne zapisy. Jeśli wyjście Kafka używa wersji 2.8.0 lub starszej ze skonfigurowanymi ACL, ale bez włączenia IDEMPOTENT_WRITE
, zapis kończy się niepowodzeniem z komunikatem o błędzie org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Rozwiąż błąd, uaktualniając do wersji 2.8.0 lub nowszej Kafki, albo ustawiając .option(“kafka.enable.idempotence”, “false”)
podczas konfigurowania pisarza Structured Streaming.
Schemat dostarczony do DataStreamWriter współdziała z wyjściem w Kafka. Możesz użyć następujących pól:
Nazwa kolumny | Wymagane lub opcjonalne | Rodzaj |
---|---|---|
key |
opcjonalny |
STRING lub BINARY |
value |
wymagane |
STRING lub BINARY |
headers |
opcjonalny | ARRAY |
topic |
opcjonalne (ignorowane, jeśli topic jest ustawiona jako opcja pisarza) |
STRING |
partition |
opcjonalny | INT |
Poniżej przedstawiono typowe opcje ustawione podczas zapisywania na platformie Kafka:
Opcja | Wartość | Wartość domyślna | Opis |
---|---|---|---|
kafka.boostrap.servers |
Rozdzielona przecinkami lista <host:port> |
Brak | [Wymagana] konfiguracja Kafka bootstrap.servers . |
topic |
STRING |
nie ustawiono | [Opcjonalnie] Ustawia temat dla wszystkich wierszy, które mają zostać zapisane. Ta opcja zastępuje dowolną kolumnę tematu, która istnieje w danych. |
includeHeaders |
BOOLEAN |
false |
[Opcjonalnie] Określa, czy w wierszu mają być uwzględniane nagłówki Kafki. |
Aby uzyskać informacje o innych opcjonalnych konfiguracjach, zobacz Przewodnik integracji Kafka w ramach uporządkowanego przesyłania strumieniowego.
Pobieranie metryk platformy Kafka
Możesz uzyskać średnią, minimalną i maksymalną liczbę przesunięć, o które zapytanie strumieniowe opóźnia się względem najnowszego dostępnego przesunięcia wśród wszystkich subskrybowanych tematów za pomocą metryk avgOffsetsBehindLatest
, maxOffsetsBehindLatest
i minOffsetsBehindLatest
. Zobacz Interaktywne odczytywanie metryk.
Uwaga
Dostępne w środowisku Databricks Runtime 9.1 lub nowszym.
Uzyskaj szacowaną łączną liczbę bajtów, których proces zapytania jeszcze nie wykorzystał z subskrybowanych tematów, sprawdzając wartość estimatedTotalBytesBehindLatest
. To oszacowanie jest oparte na partiach, które zostały przetworzone w ciągu ostatnich 300 sekund. Przedział czasu, na podstawie którego jest szacowany, można zmienić, ustawiając opcję bytesEstimateWindowLength
na inną wartość. Aby na przykład ustawić go na 10 minut:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Jeśli używasz strumienia w notatniku, możesz zobaczyć te wskaźniki w zakładce Nieprzetworzone dane na tablicy postępu zapytania streamingowego.
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Łączenie usługi Azure Databricks z platformą Kafka przy użyciu protokołu SSL
Aby włączyć połączenia SSL z platformą Kafka, postępuj zgodnie z instrukcjami w dokumentacji platformy Confluent Encryption and Authentication with SSL. Konfiguracje opisane w tym miejscu można podać z prefiksem kafka.
, jako opcje. Na przykład należy określić lokalizację magazynu zaufania we właściwości kafka.ssl.truststore.location
.
Databricks zaleca, abyś:
- Przechowywanie certyfikatów w magazynie obiektów w chmurze. Dostęp do certyfikatów można ograniczyć tylko do klastrów, które mogą uzyskiwać dostęp do platformy Kafka. Zobacz zarządzanie danymi za pomocą usługi Unity Catalog.
- Przechowuj hasła certyfikatu jako tajne w zakresie tajnym.
W poniższym przykładzie użyto lokalizacji magazynu obiektów oraz tajemnic Databricks, aby umożliwić połączenie SSL.
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Łączenie platformy Kafka w usłudze HDInsight z usługą Azure Databricks
Utwórz klaster HDInsight Kafka.
Aby uzyskać instrukcje, zobacz Connect to Kafka on HDInsight through an Azure Virtual Network (Nawiązywanie połączenia z platformą Kafka w usłudze HDInsight za pośrednictwem sieci wirtualnej platformy Azure).
Skonfiguruj brokerów Kafka, aby udostępniać poprawny adres.
Postępuj zgodnie z instrukcjami w Konfigurowanie platformy Kafka dla ogłaszania IP. Jeśli samodzielnie zarządzasz platformą Kafka w usłudze Azure Virtual Machines, upewnij się, że konfiguracja
advertised.listeners
brokerów jest ustawiona na wewnętrzny adres IP hostów.Utwórz klaster usługi Azure Databricks.
Połącz klaster Kafka z klastrem Azure Databricks.
Postępuj zgodnie z instrukcjami w Równorzędnych sieciach wirtualnych.
Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs
Usługa Azure Databricks obsługuje uwierzytelnianie zadań platformy Spark za pomocą usług Event Hubs. To uwierzytelnianie odbywa się za pośrednictwem protokołu OAuth z identyfikatorem Entra firmy Microsoft.
Usługa Azure Databricks obsługuje uwierzytelnianie za pomocą identyfikatora Microsoft Entra z identyfikatorem klienta i tajnym kluczem w następujących środowiskach obliczeniowych:
- Databricks Runtime 12.2 LTS lub nowszy na maszynie obliczeniowej skonfigurowanej z dedykowanym trybem dostępu (wcześniej znany jako tryb dostępu dla pojedynczego użytkownika).
- Środowisko Databricks Runtime 14.3 LTS lub nowsze na jednostkach obliczeniowych skonfigurowanych z trybem standardowego dostępu (dawniej tryb dostępu współdzielonego).
- Potoki danych DLT skonfigurowane bez katalogu Unity.
Azure Databricks nie obsługuje uwierzytelniania Microsoft Entra ID z certyfikatem w żadnym środowisku obliczeniowym ani w potokach DLT skonfigurowanych z Unity Catalog.
To uwierzytelnianie nie działa na obliczeniach ze standardowym trybem dostępu ani na Unity Catalog DLT.
Konfigurowanie łącznika platformy Kafka ze strukturą przesyłania strumieniowego
Aby przeprowadzić uwierzytelnianie przy użyciu identyfikatora Entra firmy Microsoft, potrzebne są następujące wartości:
Identyfikator najemcy. Możesz to znaleźć w zakładce usług Microsoft Entra ID.
ClientID (znany również jako identyfikator aplikacji).
Sekret klienta Po uzyskaniu tego należy dodać to jako tajny sekret do obszaru roboczego Databricks. Aby dodać ten sekret, zobacz Zarządzanie sekretami.
Temat EventHubs. Listę tematów można znaleźć w sekcji Event Hubs pod sekcją Jednostki na określonej stronie przestrzeni nazw Event Hubs. Aby pracować z wieloma tematami, możesz ustawić rolę IAM na poziomie Event Hubs.
Serwer usługi EventHubs. Możesz to znaleźć na stronie przeglądu określonej przestrzeni nazw usługi Event Hubs:
Ponadto, aby używać identyfikatora Entra, musimy poinformować platformę Kafka, aby korzystała z mechanizmu SASL OAuth (SASL jest protokołem ogólnym, a protokół OAuth jest typem "mechanizmu" SASL):
-
kafka.security.protocol
powinna byćSASL_SSL
-
kafka.sasl.mechanism
powinna byćOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
powinna być w pełni kwalifikowaną nazwą klasy Java z wartościąkafkashaded
dla obsługi procedury logowania naszej osłoniętej klasy Kafka. Zobacz następujący przykład dotyczący konkretnej klasy.
Przykład
Następnie przyjrzyjmy się przykładowi w działaniu:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Obsługa potencjalnych błędów
Opcje przesyłania strumieniowego nie są obsługiwane.
Jeśli spróbujesz użyć tego mechanizmu uwierzytelniania w potoku DLT skonfigurowanym z Unity Catalog, możesz otrzymać następujący błąd:
Aby rozwiązać ten błąd, użyj obsługiwanej konfiguracji obliczeniowej. Zobacz Uwierzytelnianie podmiotu usługi z wykorzystaniem Microsoft Entra ID i Azure Event Hubs.
Nie można utworzyć nowego
KafkaAdminClient
.Jest to błąd wewnętrzny zgłaszany przez platformę Kafka, jeśli którakolwiek z następujących opcji uwierzytelniania jest niepoprawna:
- Identyfikator klienta (znany również jako identyfikator aplikacji)
- Identyfikator dzierżawy
- Serwer Usługi EventHubs
Aby rozwiązać ten problem, sprawdź, czy wartości są poprawne dla tych opcji.
Ponadto ten błąd może zostać wyświetlony, jeśli zmodyfikujesz opcje konfiguracji podane domyślnie w przykładzie (które zostały poproszone o niezmodyfikowanie), takie jak
kafka.security.protocol
.Nie są zwracane żadne rekordy
Jeśli próbujesz wyświetlić lub przetworzyć ramkę danych, ale nie otrzymujesz wyników, w interfejsie użytkownika zobaczysz następujące informacje.
Ten komunikat oznacza, że uwierzytelnianie zakończyło się pomyślnie, ale usługa EventHubs nie zwróciła żadnych danych. Niektóre możliwe (choć w żaden sposób wyczerpujące) przyczyny są następujące:
- Określono niewłaściwy temat usługi EventHubs .
- Domyślną opcją konfiguracji platformy Kafka dla
startingOffsets
jestlatest
, a obecnie nie otrzymujesz jeszcze żadnych danych przez topic. Możesz ustawićstartingOffsetstoearliest
, aby rozpocząć odczytywanie danych od offsetów początkowych w Kafka.