Azure Event Hubs를 DLT 데이터 원본으로 사용
이 문서에서는 DLT를 사용하여 Azure Event Hubs의 메시지를 처리하는 방법을 설명합니다. Databricks 런타임의 일부로 제공되지 않기 때문에 Structured Streaming Event Hubs 커넥터 를 사용할 수 없으며, DLT 에서는 타사 JVM 라이브러리사용이 허용되지 않습니다.
DLT는 Azure Event Hubs에 어떻게 연결할 수 있나요?
Azure Event Hubs는 Databricks Runtime에서 사용할 수 있는 구조적 스트리밍 Kafka 커넥터사용하여 Azure Event Hubs에서 메시지를 처리하는 데 사용할 수 있는 Apache Kafka와 호환되는 엔드포인트를 제공합니다. Azure Event Hubs 및 Apache Kafka 호환성에 대한 자세한 내용은 Apache Kafka 애플리케이션에서 Azure Event Hubs 사용참조하세요.
다음 단계에서는 DLT 파이프라인을 기존 Event Hubs 인스턴스에 연결하고 토픽에서 이벤트를 사용하는 방법을 설명합니다. 이러한 단계를 완료하려면 다음 Event Hubs 연결 값이 필요합니다.
- Event Hubs 네임스페이스의 이름입니다.
- Event Hubs 네임스페이스에 있는 Event Hub 인스턴스의 이름입니다.
- Event Hubs에 대한 공유 액세스 정책 이름 및 정책 키입니다. 기본적으로 각 Event Hubs 네임스페이스에 대한
RootManageSharedAccessKey
정책이 만들어집니다. 이 정책에는manage
,send
및listen
권한이 있습니다. 파이프라인이 Event Hubs에서만 읽는 경우 Databricks는 수신 권한으로만 새 정책을 만드는 것이 좋습니다.
Event Hubs 연결 문자열에 대한 자세한 내용은 Event Hubs 연결 문자열 가져오기참조하세요.
메모
- Azure Event Hubs는 보안 리소스에 대한 액세스 권한을 부여하는 OAuth 2.0 및 SAS(공유 액세스 서명) 옵션을 모두 제공합니다. 이러한 지침은 SAS 기반 인증을 사용합니다.
- Azure Portal에서 Event Hubs 연결 문자열을 가져오는 경우
EntityPath
값이 포함되지 않을 수 있습니다.EntityPath
값은 구조적 스트리밍 Event Hubs 커넥터를 사용하는 경우에만 필요합니다. 구조적 스트리밍 Kafka 커넥터를 사용하려면 토픽 이름만 제공해야 합니다.
Azure Databricks 비밀에 정책 키 저장
정책 키는 중요한 정보이므로 Databricks는 파이프라인 코드의 값을 하드 코딩하지 않는 것이 좋습니다. 대신 Azure Databricks 비밀을 사용하여 키에 대한 액세스를 저장하고 관리합니다.
다음 예제에서는 Databricks CLI를 사용하여 비밀 범위를 만들고 해당 비밀 범위에 키를 저장합니다. 파이프라인 코드에서 dbutils.secrets.get()
함수를 scope-name
및 shared-policy-name
와 함께 사용하여 키 값을 검색합니다.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Azure Databricks 비밀에 대한 자세한 내용은 비밀 관리참조하세요.
노트북을 생성하고 이벤트를 처리하기 위해 파이프라인 코드를 추가하세요.
다음 예제에서는 토픽에서 IoT 이벤트를 읽지만 애플리케이션 요구 사항에 맞게 예제를 조정할 수 있습니다. Databricks는 DLT 파이프라인 설정을 사용하여 애플리케이션 변수를 구성하는 것이 가장 좋습니다. 그런 다음 파이프라인 코드는 spark.conf.get()
함수를 사용하여 값을 검색합니다. 파이프라인 설정을 사용하여 파이프라인을 매개변수화하는 방법에 대한 자세한 내용은 DLT 파이프라인에 매개변수를 사용하는 방법을 참조하십시오.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
파이프라인 만들기
다음 설정을 사용하여 새 파이프라인을 만들고 자리 표시자 값을 환경에 적절한 값으로 바꿉니다.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
갈다
- Azure 스토리지 계정 컨테이너의 이름인
<container-name>
. - ADLS Gen2 스토리지 계정의 이름으로
<storage-account-name>
을 사용합니다. -
<eh-namespace>
을(를) Event Hubs 네임스페이스의 이름으로 사용하십시오. -
<eh-policy-name>
는 Event Hubs 정책 키의 비밀 범위 키를 사용합니다. -
<eventhub>
을(를) Event Hubs 인스턴스의 이름으로 입력하세요. -
<secret-scope-name>
는 Event Hubs 정책 키를 포함하는 Azure Databricks 비밀 범위의 이름입니다.
이 파이프라인은 기본 DBFS 스토리지 경로를 사용하지 않고 대신 ADLS Gen2(Azure Data Lake Storage Gen2) 스토리지 계정을 사용하는 것이 가장 좋습니다. ADLS Gen2 스토리지 계정에 대한 인증을 구성하는 방법에 대한 자세한 내용은 파이프라인 비밀을 사용하여 스토리지 자격 증명에 안전하게 액세스하는참조하세요.