Strömma data till externa tjänster med DLT-sänkor
Viktig
DLT sink
-API:et finns i offentlig förhandsversion.
Den här artikeln beskriver DLT-sink
-API:et och hur du använder det med DLT-flöden för att skriva poster som transformeras av en pipeline till en extern datamottagare, till exempel hanterade och externa Unity Catalog-tabeller, Hive-metaarkivtabeller och händelseströmningstjänster som Apache Kafka eller Azure Event Hubs.
Vad är DLT-sänkor?
Med DLT-sänkor kan du skriva transformerade data till mål som händelseströmningstjänster som Apache Kafka eller Azure Event Hubs, samt externa tabeller hanterade av Unity Catalog eller Hive-metastore. Tidigare kunde strömmande tabeller och materialiserade vyer som skapats i en DLT-pipeline endast bevaras till Azure Databricks-hanterade Delta-tabeller. Med hjälp av mottagare har du nu fler alternativ för att spara utdata från dina DLT-pipelines.
När ska jag använda DLT-mottagare?
Databricks rekommenderar att du använder DLT-sinkar om du behöver:
- Skapa ett användningsfall som bedrägeriidentifiering, realtidsanalys och kundrekommendationer. Operativa användningsfall läser vanligtvis data från en meddelandebuss, till exempel ett Apache Kafka-ämne, och bearbetar sedan data med låg latens och skriver de bearbetade posterna tillbaka till en meddelandebuss. Med den här metoden kan du uppnå lägre svarstid genom att inte skriva eller läsa från molnlagring.
- Skriv transformerade data från dina DLT-flöden till tabeller som hanteras av en extern Delta-instans, inklusive hanterade Unity Catalog-tabeller och externa tabeller och Hive-metaarkivtabeller.
- Utför omvänd databearbetning (extract-transform-load, ETL) till datamottagare utanför Databricks, såsom Apache Kafka-ämnen. Med den här metoden kan du effektivt stödja användningsfall där data måste läsas eller användas utanför Unity Catalog-tabeller eller annan Databricks-hanterad lagring.
Hur använder jag DLT-mottagare?
Obs
- Endast strömmande frågor som använder
spark.readStream
ochdlt.read_stream
stöds. Batchfrågor stöds inte. - Endast
append_flow
kan användas för att skriva till mottagare. Andra flöden, till exempelapply_changes
, stöds inte. - När du kör en fullständig uppdatering tas inte tidigare beräknade resultatdata bort i datakällorna. Det innebär att alla ombearbetade data läggs till i mottagaren och att befintliga data inte ändras.
När händelsedata matas in från en strömmande källa till din DLT-pipeline bearbetar och förfinar du dessa data med hjälp av DLT-funktioner och använder sedan bearbetning av tilläggsflöde för att strömma transformerade dataposter till en DLT-mottagare. Du skapar den här sinken med hjälp av funktionen create_sink()
. För mer information om funktionen create_sink
, se API-referensen för sink .
Använd följande steg för att implementera en DLT-mottagare:
- Konfigurera en DLT-pipeline för att bearbeta strömmande händelsedata och förbereda dataposter för skrivning till en DLT-mottagare.
- Konfigurera och skapa DLT-mottagaren för att använda önskat målmottagareformat.
- Använd ett tilläggsflöde för att skriva de förberedda posterna till målet.
De här stegen beskrivs i resten av ämnet.
Konfigurera en DLT-pipeline för att förbereda register för att skriva till en mottagare
Det första steget är att konfigurera en DLT-pipeline för att omvandla rådata för händelseström till de förberedda data som du skriver till din mottagare.
För att bättre förstå den här processen kan du följa det här exemplet på en DLT-pipeline som bearbetar clickstream-händelsedata från wikipedia-datasets
exempeldata i Databricks. Den här pipelinen parsar rådatauppsättningen för att identifiera Wikipedia-sidor som länkar till en Apache Spark-dokumentationssida och förfinar gradvis dessa data till bara de tabellrader där referenslänken innehåller Apache_Spark.
I det här exemplet är DLT-pipelinen strukturerad med hjälp av medallion-arkitekturen, som organiserar data i olika lager för att förbättra kvaliteten och bearbetningseffektiviteten.
Börja genom att läsa in de råa JSON-posterna från datauppsättningen till ditt bronsskikt med hjälp av Auto Loader. Den här Python-koden visar hur du skapar en strömmande tabell med namnet clickstream_raw
, som innehåller rådata, obearbetade data från källan:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
När den här koden har körts är data nu på nivån "brons" (eller "rådata") i Medallion-arkitekturen och måste rensas. Nästa steg förfinar data till nivån "silver", vilket innebär att rensa datatyper och kolumnnamn och använda DLT-förväntningar för att säkerställa dataintegriteten.
Följande kod visar hur du gör detta genom att rensa och verifiera bronslagerdata i clickstream_clean
silvertabell:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Om du vill utveckla guldlagret i pipeline-strukturen filtrerar du den rensade klickströmsdatan för att isolera poster där den refererande sidan är Apache_Spark
. I det sista kodexemplet väljer du bara de kolumner som krävs för att skriva till målmottagarens tabell.
Följande kod visar hur du skapar en tabell med namnet spark_referrers
som representerar det guldfärgade lagret:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
När dataförberedelseprocessen har slutförts måste du konfigurera målmottagare där de rensade posterna ska skrivas in.
Konfigurera en DLT-mottagare
Databricks har stöd för tre typer av målenheter dit du skriver dina poster som har bearbetats från dina dataströmmar.
- Deltatabell-sänkor
- Apache Kafka-mottagare
- Azure Event Hubs-sänkor
Nedan visas exempel på konfigurationer för Delta-, Kafka- och Azure Event Hubs-mottagare:
Deltamottagare
Så här skapar du en Delta-sänka via filsökväg:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Så här skapar du en Delta-mottagare efter tabellnamn med hjälp av en fullständigt kvalificerad katalog- och schemasökväg:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Kafka- och Azure Event Hubs-utsläpp
Den här koden fungerar för både Apache Kafka- och Azure Event Hubs-sänkor.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Nu när din mottagare har konfigurerats och DLT-pipelinen har förberetts kan du börja strömma bearbetade poster till mottagaren.
Skriva till en DLT-mottagare med ett tilläggsflöde
När mottagaren är konfigurerad är nästa steg att skriva bearbetade poster till den genom att ange den som mål för posters utdata via ett tilläggsflöde. Du gör detta genom att ange ditt handfat som värdet target
i dekoratören append_flow
.
- För hanterade och externa unity-katalogtabeller använder du formatet
delta
och anger sökvägen eller tabellnamnet i alternativen. Dina DLT-pipelines måste konfigureras för att använda Unity Catalog. - För Apache Kafka-ämnen använder du formatet
kafka
och anger ämnesnamn, anslutningsinformation och autentiseringsinformation i alternativen. Det här är samma alternativ som en Spark Structured Streaming Kafka-mottagare stöder. Se Konfigurera Kafka Structured Streaming-skrivaren. - För Azure Event Hubs använder du formatet
kafka
och anger händelsehubbarnas namn, anslutningsinformation och autentiseringsinformation i alternativen. Det här är samma alternativ som stöds i en Spark Structured Streaming Event Hubs-mottagare som använder Kafka-gränssnittet. Se autentisering av tjänstens huvudanvändare med Microsoft Entra ID och Azure Event Hubs. - För Hive-metaarkivtabeller använder du formatet
delta
och anger sökvägen eller tabellnamnet i alternativen. Dina DLT-pipelines måste konfigureras för att använda Hive-metaarkivet.
Nedan visas exempel på hur du konfigurerar flöden för att skriva till Delta-, Kafka- och Azure Event Hubs-mottagare med poster som bearbetas av din DLT-pipeline.
Delta-handfat
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka- och Azure Event Hubs-sänkor
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Parametern value
är obligatorisk för en Azure Event Hubs-mottagare. Ytterligare parametrar som key
, partition
, headers
och topic
är valfria.
Mer information om dekoratören append_flow
finns i avsnittet "Använd tilläggsflödet för att skriva till en strömmande tabell från flera källströmmar".
begränsningar
Endast Python-API:et stöds. SQL stöds inte.
Endast strömmande frågor som använder
spark.readStream
ochdlt.read_stream
stöds. Batchfrågor stöds inte.Endast
append_flow
kan användas för att skriva till mottagare. Andra flöden, till exempelapply_changes
, stöds inte och du kan inte använda en mottagare i en DLT-datauppsättningsdefinition. Följande stöds till exempel inte:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
För Delta-sinkar måste tabellnamnet vara fullständigt kvalificerat. För Unity Catalog hanterade externa tabeller måste tabellnamnet vara i formatet
<catalog>.<schema>.<table>
. För Hive-metastore måste det vara i formatet<schema>.<table>
.Om du kör
FullRefresh
rensas inte tidigare beräknade resultatdata i mottagare. Det innebär att alla ombearbetade data läggs till i mottagaren och att befintliga data inte ändras.DLT-förväntningar stöds inte.