Uruchom pierwsze zadanie Streamingu Strukturalnego
Ten artykuł zawiera przykłady kodu i wyjaśnienie podstawowych pojęć niezbędnych do uruchamiania pierwszych zapytań przesyłania strumieniowego ze strukturą w usłudze Azure Databricks. Możesz użyć przesyłania strumieniowego ze strukturą na potrzeby obciążeń przetwarzania przyrostowego i niemal w czasie rzeczywistym.
Przesyłanie strumieniowe ze strukturą to jedna z kilku technologii, które zasilają tabele przesyłania strumieniowego w technologii DLT. Databricks zaleca używanie DLT do wszystkich nowych obciążeń ETL, pozyskiwania danych i przesyłania strumieniowego ze strukturą. Zobacz Co to jest DLT?.
Uwaga
Chociaż biblioteka DLT zapewnia nieco zmodyfikowaną składnię deklarowania tabel przesyłania strumieniowego, ogólna składnia konfigurowania odczytów i przekształceń przesyłania strumieniowego ma zastosowanie do wszystkich przypadków użycia przesyłania strumieniowego w usłudze Azure Databricks. Biblioteka DLT upraszcza również przesyłanie strumieniowe, zarządzając informacjami o stanie, metadanymi i wieloma konfiguracjami.
Użyj Auto Loader do odczytania danych przesyłanych strumieniowo z magazynu obiektów
W poniższym przykładzie pokazano ładowanie danych JSON za pomocą modułu automatycznego ładującego, które używa cloudFiles
do oznaczania formatu i opcji. Opcja schemaLocation
umożliwia wnioskowanie i ewolucję schematu. Wklej następujący kod w komórce notesu usługi Databricks i uruchom komórkę, aby utworzyć ramkę danych przesyłania strumieniowego o nazwie raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Podobnie jak w przypadku innych operacji odczytu na platformie Azure Databricks, konfigurowanie odczytu strumieniowego nie powoduje faktycznego załadowania danych. Musisz wyzwolić akcję na danych przed rozpoczęciem strumienia.
Uwaga
Wywołanie display()
ramki danych przesyłania strumieniowego uruchamia zadanie przesyłania strumieniowego. W przypadku większości zastosowań streamingu strukturalnego akcja wyzwalająca strumień powinna polegać na zapisywaniu danych do odbiornika. Zobacz Zagadnienia dotyczące produkcji Structured Streaming.
Przeprowadź transformację strumieniową
Przesyłanie strumieniowe ze strukturą obsługuje większość przekształceń dostępnych w usługach Azure Databricks i Spark SQL. Modele MLflow można nawet załadować jako funkcje zdefiniowane przez użytkownika i przewidywać przesyłanie strumieniowe jako transformację.
Poniższy przykład kodu wykonuje prostą transformację, aby wzbogacić pozyskane dane JSON o dodatkowe informacje przy użyciu funkcji Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
transformed_df
Wynik zawiera instrukcje zapytania dotyczące ładowania i przekształcania każdego rekordu w momencie jego nadejścia do źródła danych.
Uwaga
Przesyłanie strumieniowe ze strukturą traktuje źródła danych jako niezwiązane lub nieskończone zestawy danych. W związku z tym niektóre przekształcenia nie są obsługiwane w obciążeniach pracy dla usługi Strukturalnego Przesyłania Strumieniowego, ponieważ wymagają one sortowania nieskończonej ilości elementów.
Większość agregacji i wielu łączeń wymaga zarządzania informacjami o stanie za pomocą znaków wodnych, okien i trybu wyjściowego. Zobacz Stosowanie wodnych znaków do kontroli progów przetwarzania danych.
Wykonywanie przyrostowego zapisu wsadowego w usłudze Delta Lake
Poniższy przykład zapisuje do Delta Lake przy użyciu określonej ścieżki pliku i punktu kontrolnego.
Ważne
Zawsze upewnij się, że wybierasz unikalną lokalizację punktu kontrolnego dla każdego pisarza strumieniowego, którego konfigurujesz. Punkt kontrolny zapewnia unikatową tożsamość strumienia, śledząc wszystkie przetworzone rekordy i informacje o stanie skojarzone z zapytaniem przesyłanym strumieniowo.
Ustawienie availableNow
wyzwalacza powoduje, że Strukturyzowane Przesyłanie Strumieniowe przetwarza wszystkie wcześniej nieprzetworzone rekordy ze źródłowego zestawu danych, a następnie zamyka się, aby można było bezpiecznie wykonać następujący kod bez obaw o pozostawienia uruchomionego strumienia.
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
W tym przykładzie żadne nowe rekordy nie docierają do naszego źródła danych, więc powtórzenie wykonywania tego kodu nie powoduje pozyskiwania nowych rekordów.
Ostrzeżenie
Structured Streaming może zapobiec automatycznemu zakończeniu pracy zasobów obliczeniowych. Aby uniknąć nieoczekiwanych kosztów, pamiętaj o przerwaniu zapytań przesyłania strumieniowego.
Odczytywanie danych z usługi Delta Lake, przekształcanie i zapisywanie w usłudze Delta Lake
Delta Lake ma rozbudowaną obsługę pracy z Strukturalnym Strumieniowaniem zarówno jako źródło danych, jak i odbiornik. Zobacz tabelę Delta odczytów i zapisów strumieniowych.
W poniższym przykładzie pokazano przykładową składnię umożliwiającą przyrostowe ładowanie wszystkich nowych rekordów z tabeli delty, łączenie ich z migawką innej tabeli delty i zapisywanie ich w tabeli delty:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Musisz mieć skonfigurowane uprawnienia do odczytu tabel źródłowych, zapisu w tabelach docelowych oraz dostępu do lokalizacji punktu kontrolnego. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>
) przy użyciu odpowiednich wartości dla źródeł danych i ujść.
Uwaga
DLT zapewnia w pełni deklaratywną składnię do tworzenia potoków Delta Lake i automatycznie zarządza właściwościami, takimi jak wyzwalacze i punkty kontrolne. Zobacz Co to jest DLT?.
Odczytywanie danych z platformy Kafka, przekształcanie i zapisywanie na platformie Kafka
Platforma Apache Kafka i inne magistrale obsługi komunikatów zapewniają jedne z najniższych opóźnień dostępnych dla dużych zestawów danych. Za pomocą usługi Azure Databricks można zastosować przekształcenia do danych pozyskanych z platformy Kafka, a następnie zapisywać dane z powrotem na platformie Kafka.
Uwaga
Zapisywanie danych w magazynie obiektów w chmurze zwiększa dodatkowe obciążenie związane z opóźnieniami. Jeśli chcesz przechowywać dane z magistrali komunikatów w Delta Lake, ale wymagasz jak najmniejszego opóźnienia w przypadku obciążeń przesyłanych strumieniowo, Databricks zaleca skonfigurowanie oddzielnych zadań przesyłania strumieniowego w celu zapisywania danych do Lakehouse i stosowania przekształceń niemal w czasie rzeczywistym dla podrzędnych wyjść magistrali komunikatów.
Poniższy przykład kodu przedstawia prosty wzorzec wzbogacania danych z platformy Kafka przez dołączenie ich do danych w tabeli delty, a następnie zapisanie z powrotem na platformie Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Musisz mieć odpowiednie uprawnienia skonfigurowane do uzyskiwania dostępu do usługi Kafka. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>
) przy użyciu odpowiednich wartości dla źródeł danych i ujść. Zobacz Przetwarzanie strumieniowe przy użyciu platform Apache Kafka i Azure Databricks.