Przesyłanie strumieniowe rekordów do usług zewnętrznych za pomocą ujściów DLT
W tym artykule opisano interfejs API sink
DLT i sposób używania go z przepływów DLT, aby zapisywać rekordy przekształcone przez potok do zewnętrznego repozytorium danych, takiego jak zarządzane i zewnętrzne tabele w katalogu Unity, tabele magazynu metadanych Hive i usługi przesyłania strumieniowego zdarzeń, takie jak Apache Kafka lub Azure Event Hubs.
Co to są zlewy DLT?
Odbiorniki DLT umożliwiają zapisywanie przekształconych danych do celów, takich jak usługi strumieniowania zdarzeń: Apache Kafka lub Azure Event Hubs, oraz tabele zewnętrzne zarządzane przez katalog Unity lub metastore Hive. Wcześniej tabele strumieniowe i zmaterializowane widoki utworzone w potokach DLT mogły być utrwalane tylko w zarządzanych tabelach Delta usługi Azure Databricks. Dzięki ujściom masz teraz więcej opcji utrwalania danych wyjściowych potoków DLT.
Kiedy należy używać ujściów DLT?
Usługa Databricks zaleca korzystanie z ujściów DLT, jeśli potrzebujesz:
- Utwórz przypadek użycia operacyjnego, taki jak wykrywanie oszustw, analiza w czasie rzeczywistym i rekomendacje klientów. Przypadki użycia operacyjnego zwykle odczytują dane z magistrali komunikatów, takie jak temat w Apache Kafka, a następnie przetwarzają dane z niskimi opóźnieniami i zapisują przetworzone rekordy z powrotem do magistrali komunikatów. Takie podejście umożliwia osiągnięcie mniejszego opóźnienia, dzięki rezygnacji z zapisywania i odczytywania danych z magazynu w chmurze.
- Zapisywanie przekształconych danych z przepływów DLT do tabel zarządzanych przez zewnętrzne wystąpienie Delta, w tym tabel zarządzanych przez Unity Catalog, tabel zewnętrznych oraz tabel metastore Hive.
- Wykonaj odwrotne wyodrębnianie-przekształcanie-ładowanie (ETL) w ujściach zewnętrznych dla usługi Databricks, takich jak tematy platformy Apache Kafka. Takie podejście umożliwia efektywne wsparcie przypadków użycia, w których dane muszą być odczytywane lub używane poza tabelami katalogu Unity lub inną pamięcią zarządzaną przez Databricks.
Jak używać ujściów DLT?
Notatka
- Obsługiwane są tylko zapytania przesyłane strumieniowo przy użyciu
spark.readStream
idlt.read_stream
. Zapytania wsadowe nie są obsługiwane. - Tylko
append_flow
może być użyty do zapisywania w zlewach. Inne przepływy, takie jakapply_changes
, nie są obsługiwane. - Uruchomienie aktualizacji pełnego odświeżania nie powoduje wyczyszczenia wcześniej obliczonych danych wyników w ujściach. Oznacza to, że wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.
Po zaabsorbowaniu danych zdarzeń ze źródła strumieniowego do potoku DLT, przetwarzasz i udoskonalasz te dane przy użyciu funkcji DLT, a następnie używasz przetwarzania strumienia dołączającego, aby przekazywać przekształcone rekordy danych do odbiornika DLT. Tworzysz ten zlew przy użyciu funkcji create_sink()
. Aby uzyskać więcej informacji na temat korzystania z funkcji create_sink
, zobacz dokumentację interfejsu API ujścia .
Aby zaimplementować odbiornik DLT, wykonaj następujące kroki:
- Skonfiguruj rurę DLT do przetwarzania zdarzeń strumieniowych i przygotowywania rekordów do zapisywania w odbiorniku DLT.
- Skonfiguruj i utwórz ujście DLT, aby użyć preferowanego formatu ujścia docelowego.
- Użyj przepływu dołączającego , aby zapisać przygotowane rekordy do odbiornika.
Te kroki zostały omówione w pozostałej części tematu.
Konfigurowanie potoku DLT w celu przygotowania rekordów do zapisu w odbiorniku
Pierwszym krokiem jest skonfigurowanie DLT, aby przekształcić nieprzetworzone dane strumienia zdarzeń w przygotowane dane, które zapiszesz w odbiorniku.
Aby lepiej zrozumieć ten proces, możesz skorzystać z tego przykładu potoku DLT, który przetwarza dane zdarzeń strumienia kliknięć z przykładowych danych wikipedia-datasets
w usłudze Databricks. Ten pipeline analizuje nieprzetworzone zestawy danych w celu identyfikacji stron Wikipedii, które łączą się ze stroną dokumentacji platformy Apache Spark, i stopniowo przetwarza te dane tylko do wierszy tabeli, w których link zawiera Apache_Spark.
.
W tym przykładzie potok DLT jest ustrukturyzowany przy użyciu architektury medalionu , która organizuje dane w różnych warstwach w celu zwiększenia jakości i wydajności przetwarzania.
Aby rozpocząć, załaduj nieprzetworzone rekordy JSON z zestawu danych do brązowej warstwy przy użyciu Automatycznego Ładowacza . Ten kod w języku Python pokazuje, jak utworzyć tabelę strumieniową o nazwie clickstream_raw
, która zawiera nieprzetworzone dane ze źródła.
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Po wykonaniu tego kodu dane znajdują się teraz na poziomie "brąz" (lub "dane surowe") architektury Medallion i wymagają oczyszczenia. Następny krok uściśla dane do poziomu „srebra”, który obejmuje czyszczenie typów danych i nazw kolumn oraz wykorzystanie oczekiwań DLT w celu zapewnienia integralności danych.
Poniższy kod pokazuje, jak to zrobić, czyszcząc i sprawdzając dane warstwy brązu do tabeli srebrnej clickstream_clean
.
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Aby opracować "złotą" warstwę struktury potoku, należy przefiltrować oczyszczone dane strumienia kliknięć, aby odizolować wpisy, na których strona odwołująca się jest Apache_Spark
. W tym ostatnim przykładzie kodu wybierasz tylko niezbędne kolumny do zapisu w tabeli docelowej.
Poniższy kod ilustruje sposób tworzenia tabeli o nazwie spark_referrers
reprezentującej warstwę złota:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Po zakończeniu tego procesu przygotowywania danych należy skonfigurować ujścia docelowe, w których zostaną zapisane oczyszczone rekordy.
Konfigurowanie ujścia DLT
Usługa Databricks obsługuje trzy typy ujściów docelowych, w których zapisujesz rekordy przetwarzane z danych strumienia:
- Ujścia tabeli delty
- Ujścia platformy Apache Kafka
- Ujścia usługi Azure Event Hubs
Poniżej przedstawiono przykłady konfiguracji dla ujściów usługi Delta, Kafka i Azure Event Hubs:
Rozlewiska delty
Aby utworzyć ujście różnicowe według ścieżki pliku:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Aby utworzyć ujście delty według nazwy tabeli przy użyciu w pełni kwalifikowanej ścieżki wykazu i schematu:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Ujścia platformy Kafka i usługi Azure Event Hubs
Ten kod działa zarówno dla ujść Apache Kafka, jak i Azure Event Hubs.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Po skonfigurowaniu odbiornika i przygotowaniu potoku DLT, można rozpocząć przesyłanie przetworzonych rekordów strumieniowo do odbiornika.
Pisanie do odbiornika DLT w trybie dołączania
Po skonfigurowaniu ujścia, następnym krokiem jest zapisanie przetworzonych rekordów, określając ujście jako miejsce docelowe dla danych wyjściowych pochodzących z przepływu dołączania. W tym celu należy określić ujście jako wartość target
w dekoratorze append_flow
.
- W przypadku tabel zarządzanych i zewnętrznych Unity Catalog użyj formatu
delta
i w opcjach określ ścieżkę lub nazwę tabeli. Potoki DLT muszą być skonfigurowane, aby używać Unity Catalog. - W przypadku tematów Apache Kafka użyj formatu
kafka
i określ w opcjach nazwę tematu, informacje o połączeniu oraz informacje uwierzytelniające. Są to te same opcje obsługiwane przez ujście Kafka dla Spark Structured Streaming. Zobacz Konfigurowanie składnika zapisywania przesyłania strumieniowego ze strukturą platformy Kafka. - W przypadku usługi Azure Event Hubs użyj formatu
kafka
i określ nazwę usługi Event Hubs, informacje o połączeniu i informacje dotyczące uwierzytelniania w opcjach. Są to te same opcje obsługiwane w ujściu strumieniowania strukturalnego Spark Event Hubs, które korzysta z interfejsu Kafka. Zobacz Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs. - W przypadku tabel magazynu metadanych Hive użyj formatu
delta
i określ ścieżkę lub nazwę tabeli w opcjach. Twoje pipeline DLT należy skonfigurować tak, aby korzystały z magazynu metadanych Hive.
Poniżej przedstawiono przykłady konfigurowania przepływów do zapisywania w ujściach usługi Delta, Kafka i Azure Event Hubs przy użyciu rekordów przetwarzanych przez potok DLT.
Odbiornik delta
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Ujścia platformy Kafka i usługi Azure Event Hubs
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Parametr value
jest obowiązkowy dla ujścia usługi Azure Event Hubs. Dodatkowe parametry, takie jak key
, partition
, headers
i topic
, są opcjonalne.
Aby uzyskać więcej informacji na temat dekoratora append_flow
, zobacz Użycie trybu dodawania, aby zapisać do tabeli strumieniowej z wielu źródeł strumieniowych.
ograniczenia
Obsługiwany jest tylko interfejs API języka Python. Język SQL nie jest obsługiwany.
Obsługiwane są tylko zapytania przesyłane strumieniowo przy użyciu
spark.readStream
idlt.read_stream
. Zapytania wsadowe nie są obsługiwane.Tylko
append_flow
można użyć do zapisu w odbiornikach. Inne przepływy, takie jakapply_changes
, nie są obsługiwane i nie można użyć ujścia w definicji zestawu danych DLT. Na przykład następujące nie są obsługiwane:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
W przypadku systemów Delta nazwa tabeli musi być w pełni kwalifikowana. W szczególności w przypadku tabel zewnętrznych zarządzanych przez Unity Catalog, nazwa tabeli musi mieć format
<catalog>.<schema>.<table>
. W przypadku magazynu metadanych Hive musi on znajdować się w postaci<schema>.<table>
.Uruchomienie
FullRefresh
nie spowoduje wyczyszczenia wcześniej obliczonych danych wynikowych w komponentach wyjściowych. Oznacza to, że wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.Oczekiwania DLT nie są obsługiwane.
Zasoby
- tworzenie potoków DLT
- Ładowanie i przetwarzanie danych przyrostowo przy użyciu przepływów DLT
- dokumentacja interfejsu API ujścia języka Python