Udostępnij za pośrednictwem


Przesyłanie strumieniowe rekordów do usług zewnętrznych za pomocą ujściów DLT

Ważny

Interfejs API sink DLT znajduje się w publicznej wersji zapoznawczej .

W tym artykule opisano interfejs API sink DLT i sposób używania go z przepływów DLT, aby zapisywać rekordy przekształcone przez potok do zewnętrznego repozytorium danych, takiego jak zarządzane i zewnętrzne tabele w katalogu Unity, tabele magazynu metadanych Hive i usługi przesyłania strumieniowego zdarzeń, takie jak Apache Kafka lub Azure Event Hubs.

Co to są zlewy DLT?

Odbiorniki DLT umożliwiają zapisywanie przekształconych danych do celów, takich jak usługi strumieniowania zdarzeń: Apache Kafka lub Azure Event Hubs, oraz tabele zewnętrzne zarządzane przez katalog Unity lub metastore Hive. Wcześniej tabele strumieniowe i zmaterializowane widoki utworzone w potokach DLT mogły być utrwalane tylko w zarządzanych tabelach Delta usługi Azure Databricks. Dzięki ujściom masz teraz więcej opcji utrwalania danych wyjściowych potoków DLT.

Kiedy należy używać ujściów DLT?

Usługa Databricks zaleca korzystanie z ujściów DLT, jeśli potrzebujesz:

  • Utwórz przypadek użycia operacyjnego, taki jak wykrywanie oszustw, analiza w czasie rzeczywistym i rekomendacje klientów. Przypadki użycia operacyjnego zwykle odczytują dane z magistrali komunikatów, takie jak temat w Apache Kafka, a następnie przetwarzają dane z niskimi opóźnieniami i zapisują przetworzone rekordy z powrotem do magistrali komunikatów. Takie podejście umożliwia osiągnięcie mniejszego opóźnienia, dzięki rezygnacji z zapisywania i odczytywania danych z magazynu w chmurze.
  • Zapisywanie przekształconych danych z przepływów DLT do tabel zarządzanych przez zewnętrzne wystąpienie Delta, w tym tabel zarządzanych przez Unity Catalog, tabel zewnętrznych oraz tabel metastore Hive.
  • Wykonaj odwrotne wyodrębnianie-przekształcanie-ładowanie (ETL) w ujściach zewnętrznych dla usługi Databricks, takich jak tematy platformy Apache Kafka. Takie podejście umożliwia efektywne wsparcie przypadków użycia, w których dane muszą być odczytywane lub używane poza tabelami katalogu Unity lub inną pamięcią zarządzaną przez Databricks.

Jak używać ujściów DLT?

Notatka

  • Obsługiwane są tylko zapytania przesyłane strumieniowo przy użyciu spark.readStream i dlt.read_stream. Zapytania wsadowe nie są obsługiwane.
  • Tylko append_flow może być użyty do zapisywania w zlewach. Inne przepływy, takie jak apply_changes, nie są obsługiwane.
  • Uruchomienie aktualizacji pełnego odświeżania nie powoduje wyczyszczenia wcześniej obliczonych danych wyników w ujściach. Oznacza to, że wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.

Po zaabsorbowaniu danych zdarzeń ze źródła strumieniowego do potoku DLT, przetwarzasz i udoskonalasz te dane przy użyciu funkcji DLT, a następnie używasz przetwarzania strumienia dołączającego, aby przekazywać przekształcone rekordy danych do odbiornika DLT. Tworzysz ten zlew przy użyciu funkcji create_sink(). Aby uzyskać więcej informacji na temat korzystania z funkcji create_sink, zobacz dokumentację interfejsu API ujścia .

Aby zaimplementować odbiornik DLT, wykonaj następujące kroki:

  1. Skonfiguruj rurę DLT do przetwarzania zdarzeń strumieniowych i przygotowywania rekordów do zapisywania w odbiorniku DLT.
  2. Skonfiguruj i utwórz ujście DLT, aby użyć preferowanego formatu ujścia docelowego.
  3. Użyj przepływu dołączającego , aby zapisać przygotowane rekordy do odbiornika.

Te kroki zostały omówione w pozostałej części tematu.

Konfigurowanie potoku DLT w celu przygotowania rekordów do zapisu w odbiorniku

Pierwszym krokiem jest skonfigurowanie DLT, aby przekształcić nieprzetworzone dane strumienia zdarzeń w przygotowane dane, które zapiszesz w odbiorniku.

Aby lepiej zrozumieć ten proces, możesz skorzystać z tego przykładu potoku DLT, który przetwarza dane zdarzeń strumienia kliknięć z przykładowych danych wikipedia-datasets w usłudze Databricks. Ten pipeline analizuje nieprzetworzone zestawy danych w celu identyfikacji stron Wikipedii, które łączą się ze stroną dokumentacji platformy Apache Spark, i stopniowo przetwarza te dane tylko do wierszy tabeli, w których link zawiera Apache_Spark..

W tym przykładzie potok DLT jest ustrukturyzowany przy użyciu architektury medalionu , która organizuje dane w różnych warstwach w celu zwiększenia jakości i wydajności przetwarzania.

Aby rozpocząć, załaduj nieprzetworzone rekordy JSON z zestawu danych do brązowej warstwy przy użyciu Automatycznego Ładowacza . Ten kod w języku Python pokazuje, jak utworzyć tabelę strumieniową o nazwie clickstream_raw, która zawiera nieprzetworzone dane ze źródła.

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

Po wykonaniu tego kodu dane znajdują się teraz na poziomie "brąz" (lub "dane surowe") architektury Medallion i wymagają oczyszczenia. Następny krok uściśla dane do poziomu „srebra”, który obejmuje czyszczenie typów danych i nazw kolumn oraz wykorzystanie oczekiwań DLT w celu zapewnienia integralności danych.

Poniższy kod pokazuje, jak to zrobić, czyszcząc i sprawdzając dane warstwy brązu do tabeli srebrnej clickstream_clean.

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

Aby opracować "złotą" warstwę struktury potoku, należy przefiltrować oczyszczone dane strumienia kliknięć, aby odizolować wpisy, na których strona odwołująca się jest Apache_Spark. W tym ostatnim przykładzie kodu wybierasz tylko niezbędne kolumny do zapisu w tabeli docelowej.

Poniższy kod ilustruje sposób tworzenia tabeli o nazwie spark_referrers reprezentującej warstwę złota:

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

Po zakończeniu tego procesu przygotowywania danych należy skonfigurować ujścia docelowe, w których zostaną zapisane oczyszczone rekordy.

Konfigurowanie ujścia DLT

Usługa Databricks obsługuje trzy typy ujściów docelowych, w których zapisujesz rekordy przetwarzane z danych strumienia:

  • Ujścia tabeli delty
  • Ujścia platformy Apache Kafka
  • Ujścia usługi Azure Event Hubs

Poniżej przedstawiono przykłady konfiguracji dla ujściów usługi Delta, Kafka i Azure Event Hubs:

Rozlewiska delty

Aby utworzyć ujście różnicowe według ścieżki pliku:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Aby utworzyć ujście delty według nazwy tabeli przy użyciu w pełni kwalifikowanej ścieżki wykazu i schematu:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

Ujścia platformy Kafka i usługi Azure Event Hubs

Ten kod działa zarówno dla ujść Apache Kafka, jak i Azure Event Hubs.

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

Po skonfigurowaniu odbiornika i przygotowaniu potoku DLT, można rozpocząć przesyłanie przetworzonych rekordów strumieniowo do odbiornika.

Pisanie do odbiornika DLT w trybie dołączania

Po skonfigurowaniu ujścia, następnym krokiem jest zapisanie przetworzonych rekordów, określając ujście jako miejsce docelowe dla danych wyjściowych pochodzących z przepływu dołączania. W tym celu należy określić ujście jako wartość target w dekoratorze append_flow.

  • W przypadku tabel zarządzanych i zewnętrznych Unity Catalog użyj formatu delta i w opcjach określ ścieżkę lub nazwę tabeli. Potoki DLT muszą być skonfigurowane, aby używać Unity Catalog.
  • W przypadku tematów Apache Kafka użyj formatu kafka i określ w opcjach nazwę tematu, informacje o połączeniu oraz informacje uwierzytelniające. Są to te same opcje obsługiwane przez ujście Kafka dla Spark Structured Streaming. Zobacz Konfigurowanie składnika zapisywania przesyłania strumieniowego ze strukturą platformy Kafka.
  • W przypadku usługi Azure Event Hubs użyj formatu kafka i określ nazwę usługi Event Hubs, informacje o połączeniu i informacje dotyczące uwierzytelniania w opcjach. Są to te same opcje obsługiwane w ujściu strumieniowania strukturalnego Spark Event Hubs, które korzysta z interfejsu Kafka. Zobacz Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs.
  • W przypadku tabel magazynu metadanych Hive użyj formatu delta i określ ścieżkę lub nazwę tabeli w opcjach. Twoje pipeline DLT należy skonfigurować tak, aby korzystały z magazynu metadanych Hive.

Poniżej przedstawiono przykłady konfigurowania przepływów do zapisywania w ujściach usługi Delta, Kafka i Azure Event Hubs przy użyciu rekordów przetwarzanych przez potok DLT.

Odbiornik delta

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Ujścia platformy Kafka i usługi Azure Event Hubs

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

Parametr value jest obowiązkowy dla ujścia usługi Azure Event Hubs. Dodatkowe parametry, takie jak key, partition, headersi topic, są opcjonalne.

Aby uzyskać więcej informacji na temat dekoratora append_flow, zobacz Użycie trybu dodawania, aby zapisać do tabeli strumieniowej z wielu źródeł strumieniowych.

ograniczenia

  • Obsługiwany jest tylko interfejs API języka Python. Język SQL nie jest obsługiwany.

  • Obsługiwane są tylko zapytania przesyłane strumieniowo przy użyciu spark.readStream i dlt.read_stream. Zapytania wsadowe nie są obsługiwane.

  • Tylko append_flow można użyć do zapisu w odbiornikach. Inne przepływy, takie jak apply_changes, nie są obsługiwane i nie można użyć ujścia w definicji zestawu danych DLT. Na przykład następujące nie są obsługiwane:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • W przypadku systemów Delta nazwa tabeli musi być w pełni kwalifikowana. W szczególności w przypadku tabel zewnętrznych zarządzanych przez Unity Catalog, nazwa tabeli musi mieć format <catalog>.<schema>.<table>. W przypadku magazynu metadanych Hive musi on znajdować się w postaci <schema>.<table>.

  • Uruchomienie FullRefresh nie spowoduje wyczyszczenia wcześniej obliczonych danych wynikowych w komponentach wyjściowych. Oznacza to, że wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.

  • Oczekiwania DLT nie są obsługiwane.

Zasoby