Megosztás a következőn keresztül:


Az Azure Event Hubs használata DLT-adatforrásként

Ez a cikk bemutatja, hogyan használható a DLT az Azure Event Hubsból érkező üzenetek feldolgozására. Nem használhatja a strukturált streamelési eseményközpontok összekötőjét, mert ez a kódtár nem érhető el a Databricks Runtime részeként, és a DLT nem teszi lehetővé külső JVM-kódtárakhasználatát.

Hogyan csatlakozhat a DLT az Azure Event Hubshoz?

Az Azure Event Hubs egy olyan végpontot biztosít, amely kompatibilis az Apache Kafkával, és amelyet a Databricks Runtime-ban elérhető Strukturált Streamelési Kafka-összekötősegítségével használhat az Azure Event Hubs-ból érkező üzenetek feldolgozásához. További információ az Azure Event Hubs és az Apache Kafka kompatibilitásáról: Az Azure Event Hubs használata Apache Kafka-alkalmazásokból.

Az alábbi lépések egy DLT-folyamat meglévő Event Hubs-példányhoz való csatlakoztatását és egy témakör eseményeinek felhasználását ismertetik. A lépések végrehajtásához a következő Event Hubs-kapcsolati értékekre van szükség:

  • Az Event Hubs-névtér neve.
  • Az Event Hubs-névtérben található Event Hub-példány neve.
  • Az Event Hubs megosztott hozzáférési szabályzatának neve és szabályzatkulcsa. Alapértelmezés szerint minden Event Hubs-névtérhez létrejön egy RootManageSharedAccessKey szabályzat. Ez a szabályzat manage, send és listen engedélyekkel rendelkezik. Ha a folyamat csak az Event Hubsból olvas be, a Databricks azt javasolja, hogy hozzon létre egy új szabályzatot, amely csak figyelési engedéllyel rendelkezik.

További információ az Event Hubs kapcsolati karakterláncról: Event Hubs kapcsolati karakterlánc lekérése.

Jegyzet

  • Az Azure Event Hubs az OAuth 2.0 és a közös hozzáférésű jogosultságkód (SAS) beállításait is biztosítja a biztonságos erőforrásokhoz való hozzáférés engedélyezéséhez. Ezek az utasítások SAS-alapú hitelesítést használnak.
  • Ha az Event Hubs kapcsolati karakterláncát az Azure portálról szerzi be, előfordulhat, hogy az nem tartalmazza a EntityPath értéket. A EntityPath érték csak a strukturált streamelési eseményközpontok összekötőjének használatakor szükséges. A Strukturált streamelési Kafka-összekötő használatához csak a témakör nevét kell megadni.

A szabályzatkulcs tárolása Egy Azure Databricks-titkos kódban

Mivel a szabályzatkulcs bizalmas információ, a Databricks azt javasolja, hogy ne írja be az értéket a folyamatkódban. Ehelyett használja az Azure Databricks titkos kulcsait a kulcshoz való hozzáférés tárolására és kezelésére.

Az alábbi példa a Databricks parancssori felületén hoz létre egy titkos hatókört, és ebben a titkos hatókörben tárolja a kulcsot. A folyamatkódban használja a dbutils.secrets.get() függvényt a scope-name és a shared-policy-name segítségével a kulcsérték lekéréséhez.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

További információ az Azure Databricks titkos kulcsokról: Titkos kódok kezelése.

Jegyzetfüzet létrehozása és folyamatkód hozzáadása események felhasználásához

Az alábbi példa beolvassa az IoT-eseményeket egy témakörből, de a példát az alkalmazás követelményeihez igazíthatja. Ajánlott eljárásként a Databricks a DLT-folyamat beállításainak használatát javasolja az alkalmazásváltozók konfigurálásához. A folyamatkód ezután a spark.conf.get() függvénnyel kéri le az értékeket. A folyamatbeállítások folyamatparaméterezésével kapcsolatos további információkért lásd: Paraméterek használata DLT-folyamatokkal.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

A csővezeték létrehozása

Hozzon létre egy új folyamatot a következő beállításokkal, és cserélje le a helyőrző értékeket a környezetének megfelelő értékekre.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Helyettesít

  • <container-name> egy Azure storage-fiók tárolójának a neve.
  • <storage-account-name> egy ADLS Gen2-tárfiók nevével.
  • <eh-namespace> az Event Hubs-névtér nevével.
  • <eh-policy-name> az Event Hubs szabályzatkulcsának titkos hatókörkulcsa.
  • <eventhub> az Event Hubs-példány nevével.
  • <secret-scope-name> az Azure Databricks titkosítási hatókör nevével, amely tartalmazza az Event Hubs szabályzat kulcsát.

Ajánlott eljárásként ez a folyamat nem az alapértelmezett DBFS tárolási útvonalat használja, hanem egy Azure Data Lake Storage Gen2 (ADLS Gen2) tárfiókot használ. További információ az ADLS Gen2-tárfiók hitelesítésének konfigurálásáról: lásd Tároló hitelesítő adatok biztonságos elérése titkosított elemekkel a folyamatban.