Aracılığıyla paylaş


DLT havuzları ile kayıtları dış hizmetlere akışla aktar

Önemli

DLT sink API Genel Önizlemeaşamasındadır.

Bu makalede DLT sink API'sinin yanı sıra DLT akışlarıyla nasıl kullanılacağı unity kataloğu yönetilen ve dış tablolar, Hive meta veri deposu tabloları ve Apache Kafka veya Azure Event Hubs gibi olay akış hizmetleri gibi bir dış veri havuzuna işlem hattı tarafından dönüştürülen kayıtları yazmak için nasıl kullanılacağı açıklanır.

DLT havuzları nedir?

DLT havuzları, Apache Kafka veya Azure Event Hubs gibi olay akış hizmetleri ve Unity Kataloğu veya Hive meta veri deposu tarafından yönetilen dış tablolar gibi hedeflere dönüştürülmüş veriler yazmanızı sağlar. Daha önce DLT işlem hattında oluşturulan akış tabloları ve gerçekleştirilmiş görünümler yalnızca Azure Databricks tarafından yönetilen Delta tablolarında kalıcı hale getirilebiliyordu. Havuzları kullanarak artık DLT işlem hatlarınızın çıkışını kalıcı hale döndürmek için daha fazla seçeneğiniz vardır.

DLT havuzlarını ne zaman kullanmalıyım?

Databricks, aşağıdakilere ihtiyacınız varsa DLT havuzlarını kullanmanızı önerir:

  • Sahtekarlık algılama, gerçek zamanlı analiz ve müşteri önerileri gibi operasyonel bir kullanım örneği oluşturun. operasyonel kullanım örnekleri genellikle Apache Kafka konusu gibi bir ileti veri yolu verilerini okur ve ardından düşük gecikme süresiyle verileri işler ve işlenen kayıtları bir ileti veri yolu'na geri yazar. Bu yaklaşım, bulut depolama alanından yazmayarak veya okumayarak daha düşük gecikme süresi elde etmenizi sağlar.
  • DLT akışlarınızdaki dönüştürülmüş verileri Unity Kataloğu tarafından yönetilen ve dış tablolar ile Hive meta veri deposu tabloları da dahil olmak üzere dış Delta örneği tarafından yönetilen tablolara yazın.
  • Apache Kafka konuları gibi Databricks dışındaki havuzlara ters ayıklama-dönüştürme-yükleme (ETL) gerçekleştirin. Bu yaklaşım, Unity Kataloğu tablolarının veya Databricks tarafından yönetilen diğer depolama alanlarının dışında verilerin okunması veya kullanılması gereken kullanım örneklerini etkili bir şekilde desteklemenizi sağlar.

DLT havuzlarını nasıl kullanabilirim?

Not

  • Yalnızca spark.readStream ve dlt.read_stream kullanan akış sorguları desteklenir. Batch sorguları desteklenmez.
  • Lavabolara yazmak için yalnızca append_flow kullanılabilir. apply_changesgibi diğer akışlar desteklenmez.
  • tam yenileme güncelleştirmesi çalıştırmak, havuzlardaki daha önce hesaplanan sonuç verilerini temizlemez. Bu, yeniden işlenmiş verilerin havuza eklendiği ve mevcut verilerin değiştirilmeyeceği anlamına gelir.

Olay verileri bir akış kaynağından DLT işlem hattınıza alınırken, DLT işlevselliğini kullanarak bu verileri işleyip daraltıp ardından dönüştürülmüş veri kayıtlarını DLT havuzuna akışla aktarmak için ekleme akışı işlemeyi kullanırsınız. Bu havuzu create_sink() fonksiyonunu kullanarak oluşturursunuz. create_sink işlevini kullanma hakkında daha fazla ayrıntı için, API referansınabakın.

DLT havuzu uygulamak için aşağıdaki adımları kullanın:

  1. Akış olayı verilerini işlemek ve veri kayıtlarını DLT havuzuna yazmak üzere hazırlamak için bir DLT işlem hattı ayarlayın.
  2. Tercih edilen hedef havuz biçimini kullanmak için DLT havuzu yapılandırın ve oluşturun.
  3. Hazırlanan kayıtları havuza yazmak için ekleme akışı kullanın.

Bu adımlar, konunun geri kalanında ele alınmıştır.

Kayıtları havuza yazmaya hazırlamak için DLT işlem hattı ayarlama

İlk adım, ham olay akışı verilerini havuzunuza yazacağınız hazırlanmış verilere dönüştürmek için bir DLT işlem hattı ayarlamaktır.

Bu işlemi daha iyi anlamak için Databricks'teki wikipedia-datasets örnek verilerinden tıklama akışı olay verilerini işleyen bu DLT işlem hattı örneğini izleyebilirsiniz. Bu işlem hattı, Apache Spark belge sayfasına bağlanan Wikipedia sayfalarını tanımlamak için ham veri kümesini ayrıştırır ve bu verileri yalnızca başvuran bağlantının içerdiği tablo satırlarına aşamalı olarak daraltır Apache_Spark.

Bu örnekte DLT işlem hattı, kalite ve işleme verimliliğini artırmak için verileri farklı katmanlar halinde düzenleyen madalyon mimarisikullanılarak yapılandırılmıştır.

Başlamak için, Otomatik Yükleyicikullanarak veri kümesindeki ham JSON kayıtlarını bronz katmanınıza yükleyin. Bu Python kodu, kaynaktan ham, işlenmemiş verileri içeren clickstream_rawadlı bir akış tablosunun nasıl oluşturulacağını gösterir:

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

Bu kod çalıştırıldıktan sonra veriler artık Madalyon mimarisinin "bronz" (veya "ham veri") düzeyindedir ve temizlenmesi gerekir. Bir sonraki adım, veri türlerini ve sütun adlarını temizlemeyi ve veri bütünlüğünü sağlamak için DLT beklentilerini kullanmayı içeren verileri "silver" düzeyine daraltmaktadır.

Aşağıdaki kod, bronz katman verilerini temizleyip clickstream_clean gümüş tabloda doğrulayarak bunun nasıl yapılacağını gösterir:

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

İşlem hattı yapınızın "altın" katmanını oluşturmak için, temizlenen tıklama akışı verilerini filtreleyip yönlendiren sayfanın Apache_Sparkolduğu girişleri yalıtabilirsiniz. Bu son kod örneğinde, yalnızca hedef havuz tablonuza yazmak için gereken sütunları seçersiniz.

Aşağıdaki kod, altın katmanı temsil eden spark_referrers adlı bir tablonun nasıl oluşturulacağını gösterir:

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

Bu veri hazırlama işlemi tamamlandıktan sonra, temizlenen kayıtların yazılacağı hedef havuzları yapılandırmanız gerekir.

DLT havuzu yapılandırma

Databricks, akış verilerinizden işlenen kayıtlarınızı yazdığınız üç tür hedef havuzu destekler:

  • Delta masa lavaboları
  • Apache Kafka çıkışları
  • Azure Event Hubs hedefleri

Aşağıda Delta, Kafka ve Azure Event Hubs havuzları için yapılandırma örnekleri verilmiştir:

Delta havuzları

Dosya yoluna göre Delta havuzu oluşturmak için:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Tam katalog ve şema yolu kullanarak tablo adına göre delta havuzu oluşturmak için:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

Kafka ve Azure Event Hubs havuzları

Bu kod hem Apache Kafka hem de Azure Event Hubs havuzları için çalışır.

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

Artık havuzunuz yapılandırıldığına ve DLT işlem hattınızın hazır olduğuna göre, işlenen kayıtları havuza akışla aktarmaya başlayabilirsiniz.

ekleme akışıyla DLT havuzuna yazma

Lavabonuz yapılandırıldıktan sonra, sonraki adım işlenen kayıtları, bir ekleme akışı tarafından üretilen kayıtların çıktısı için hedef olarak belirleyerek buna yazmaktır. Bunu yapmak için havuzunuzu append_flow dekoratördeki target değeri olarak belirtirsiniz.

  • Unity Kataloğu yönetilen ve dış tablolar için delta biçimini kullanın ve seçeneklerde yolu veya tablo adını belirtin. DLT işlem hatlarınızın Unity Kataloğu'nu kullanacak şekilde yapılandırılması gerekir.
  • Apache Kafka konuları için kafka biçimini kullanın ve seçeneklerde konu adını, bağlantı bilgilerini ve kimlik doğrulama bilgilerini belirtin. Bunlar Spark Yapılandırılmış Akış Kafka havuzu tarafından desteklenen seçeneklerle aynıdır. Bkz. Kafka Yapılandırılmış Akış yazıcısını yapılandırma.
  • Azure Event Hubs için kafka biçimini kullanın ve seçeneklerde Event Hubs adını, bağlantı bilgilerini ve kimlik doğrulama bilgilerini belirtin. Bunlar, Kafka arabirimini kullanan Spark Yapılandırılmış Akış Event Hubs havuzu için desteklenen seçeneklerle aynıdır. Bkz. Microsoft Entra ID ve Azure Event Hubs ileHizmet Sorumlusu kimlik doğrulaması.
  • Hive meta veri deposu tabloları için delta biçimini kullanın ve seçeneklerde yolu veya tablo adını belirtin. DLT işlem hatlarınızın Hive meta veri depolarını kullanacak şekilde yapılandırılması gerekir.

Aşağıda, DLT işlem hattınız tarafından işlenen kayıtlarla Delta, Kafka ve Azure Event Hubs havuzlarına yazacak akışların nasıl ayarlanacağına yönelik örnekler verilmiştir.

Delta havuzu

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Kafka ve Azure Event Hubs çıkışları

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

Azure Event Hubs havuzu için value parametresi zorunludur. key, partition, headersve topic gibi ek parametreler isteğe bağlıdır.

append_flow dekoratörü hakkında daha fazla ayrıntı için bkz. Birden çok kaynak akışından akış tablosuna yazmak için ekleme akışını kullanma.

Sınırlamalar

  • Yalnızca Python API'sini destekler. SQL desteklenmez.

  • Yalnızca spark.readStream ve dlt.read_stream kullanan akış sorguları desteklenir. Batch sorguları desteklenmez.

  • Lavabolara yazmak için yalnızca append_flow kullanılabilir. apply_changesgibi diğer akışlar desteklenmez ve DLT veri kümesi tanımında havuz kullanamazsınız. Örneğin, aşağıdakiler desteklenmez:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • Delta havuzları için tablo adının tam olarak nitelenmiş olması gerekir. Özellikle, Unity Kataloğu tarafından yönetilen dış tablolar için tablo adının <catalog>.<schema>.<table>biçiminde olması gerekir. Hive meta veri deposu için <schema>.<table>biçiminde olmalıdır.

  • FullRefresh çalıştırılırsa havuzlardaki önceden hesaplanan sonuç verileri temizlenmez. Bu, yeniden işlenmiş verilerin havuza eklendiği ve mevcut verilerin değiştirilmeyeceği anlamına gelir.

  • DLT beklentileri desteklenmiyor.

Kaynaklar