Bagikan melalui


Menggunakan Azure Event Hubs sebagai sumber data DLT

Artikel ini menjelaskan cara menggunakan DLT untuk memproses pesan dari Azure Event Hubs. Anda tidak dapat menggunakan konektor Structured Streaming Event Hubs karena pustaka ini tidak tersedia sebagai bagian dari Databricks Runtime, dan DLT tidak memungkinkan Anda menggunakan pustaka JVM pihak ketiga.

Bagaimana DLT dapat tersambung ke Azure Event Hubs?

Azure Event Hubs menyediakan titik akhir yang kompatibel dengan Apache Kafka yang dapat Anda gunakan dengan konektor Structured Streaming Kafka, tersedia di Databricks Runtime, untuk memproses pesan dari Azure Event Hubs. Untuk informasi selengkapnya tentang Azure Event Hubs dan kompatibilitas Apache Kafka, lihat Menggunakan Azure Event Hubs dari aplikasi Apache Kafka.

Langkah-langkah berikut menjelaskan menyambungkan alur DLT ke instans Azure Event Hubs yang ada dan mengambil event dari suatu topik. Untuk menyelesaikan langkah-langkah ini, Anda memerlukan nilai koneksi Azure Event Hubs berikut:

  • Nama namespace layanan Azure Event Hubs.
  • Nama instans Event Hub di namespace Event Hubs.
  • Nama kebijakan akses bersama dan kunci kebijakan untuk Event Hubs. Secara default, kebijakan RootManageSharedAccessKey dibuat untuk setiap namespace layanan Azure Event Hubs. Kebijakan ini memiliki izin manage, send, dan listen. Jika alur Anda hanya membaca dari Azure Event Hubs, Databricks merekomendasikan untuk membuat kebijakan baru hanya dengan izin mendengarkan.

Untuk informasi selengkapnya tentang string koneksi Azure Event Hubs, lihat Mendapatkan string koneksi Azure Event Hubs.

Nota

  • Azure Event Hubs menyediakan opsi OAuth 2.0 dan tanda tangan akses bersama (SAS) untuk mengotorisasi akses ke sumber daya aman Anda. Instruksi ini menggunakan autentikasi berbasis SAS.
  • Jika Anda mendapatkan string koneksi Azure Event Hubs dari portal Microsoft Azure, string tersebut mungkin tidak berisi nilai EntityPath. Nilai EntityPath hanya diperlukan saat menggunakan konektor Structured Streaming Event Hubs. Menggunakan Konektor Kafka Streaming Terstruktur hanya memerlukan nama topik.

Menyimpan kunci kebijakan dalam rahasia Azure Databricks

Karena kunci kebijakan adalah informasi sensitif, Databricks merekomendasikan untuk tidak mengodekan nilai dalam kode alur Anda. Sebagai gantinya, gunakan rahasia Azure Databricks untuk menyimpan dan mengelola akses ke kunci.

Contoh berikut menggunakan Databricks CLI untuk membuat cakupan rahasia dan menyimpan kunci dalam cakupan rahasia tersebut. Dalam kode alur Anda, gunakan fungsi dbutils.secrets.get() dengan scope-name dan shared-policy-name untuk mengambil nilai kunci.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Untuk informasi selengkapnya tentang rahasia Azure Databricks, lihat Manajemen rahasia.

Membuat buku catatan dan menambahkan kode alur untuk memproses acara

Contoh ini membaca peristiwa IoT dari topik, namun Anda bisa menyesuaikan contoh ini untuk persyaratan aplikasi Anda. Sebagai praktik terbaik, Databricks merekomendasikan penggunaan pengaturan alur DLT untuk mengonfigurasi variabel aplikasi. Kode alur Anda kemudian menggunakan fungsi spark.conf.get() untuk mengambil nilai. Untuk informasi selengkapnya tentang penggunaan pengaturan pipeline agar dapat memparameterkan alur kerja Anda, lihat Menggunakan parameter dengan alur DLT.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Membuat alur

Buat alur baru dengan pengaturan berikut, ganti nilai tempat penampung dengan nilai yang sesuai untuk lingkungan Anda.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Mengganti

  • <container-name> sebagai nama dari kontainer pada akun penyimpanan Azure.
  • <storage-account-name> dengan nama akun penyimpanan ADLS Gen2.
  • <eh-namespace> dengan nama namespace Event Hubs Anda.
  • <eh-policy-name> dengan kunci lingkup rahasia untuk kunci kebijakan Event Hubs.
  • <eventhub> dengan nama instans Event Hubs Anda.
  • <secret-scope-name> dengan nama cakupan rahasia Azure Databricks yang berisi kunci kebijakan Event Hubs.

Sebagai praktik terbaik, alur ini tidak menggunakan jalur penyimpanan DBFS default tetapi menggunakan akun penyimpanan Azure Data Lake Storage Gen2 (ADLS Gen2). Untuk informasi selengkapnya tentang mengonfigurasi autentikasi untuk akun penyimpanan ADLS Gen2, lihat Mengakses kredensial penyimpanan dengan aman dengan rahasia dalam alur.