다음을 통해 공유


DLT Python 언어 참조

이 문서에는 DLT Python 프로그래밍 인터페이스에 대한 세부 정보가 있습니다.

SQL API에 대한 자세한 내용은 DLT SQL 언어 참조참조하세요.

자동 로더 구성과 관련된 자세한 내용은 자동 로더란?을 참조하세요..

시작하기 전에

다음은 DLT Python 인터페이스를 사용하여 파이프라인을 구현할 때 고려해야 할 중요한 사항입니다.

  • Python table()view() 함수는 파이프라인 업데이트를 계획하고 실행하는 동안 여러 번 호출되므로 부작용이 있을 수 있는 이러한 함수 중 하나에 코드를 포함하지 마세요(예: 데이터를 수정하거나 이메일을 보내는 코드). 예기치 않은 동작을 방지하려면 데이터 세트를 정의하는 Python 함수에 테이블 또는 뷰를 정의하는 데 필요한 코드만 포함되어야 합니다.
  • 특히 데이터 세트를 정의하는 함수에서 전자 메일을 보내거나 외부 모니터링 서비스와 통합하는 등의 작업을 수행하려면 이벤트 후크사용합니다. 데이터 세트를 정의하는 함수에서 이러한 작업을 구현하면 예기치 않은 동작이 발생합니다.
  • Python tableview 함수는 DataFrame을 반환해야 합니다. DataFrames에서 작동하는 일부 함수는 DataFrame을 반환하지 않으며 사용해서는 안 됩니다. 이러한 작업에는 collect(), count(), toPandas(), save()saveAsTable()같은 함수가 포함됩니다. 데이터 프레임 변환은 전체 데이터 흐름 그래프가 확인된 실행되므로 이러한 작업을 사용하면 의도하지 않은 부작용이 발생할 수 있습니다.

dlt Python 모듈 가져오기

DLT Python 함수는 dlt 모듈에 정의됩니다. Python API를 사용하여 구현된 파이프라인은 다음 모듈을 가져와야 합니다.

import dlt

DLT 구체화된 뷰 또는 스트리밍 테이블 만들기

Python에서 DLT는 정의 쿼리를 기반으로 데이터 세트를 구체화된 뷰로 업데이트할지 아니면 스트리밍 테이블로 업데이트할지 결정합니다. @table 데코레이터를 사용하여 구체화된 뷰와 스트리밍 테이블을 모두 정의할 수 있습니다.

Python에서 구체화된 뷰를 정의하려면 데이터 원본에 대해 정적 읽기를 수행하는 쿼리에 @table 적용합니다. 스트리밍 테이블을 정의하려면 데이터 원본에 대해 스트리밍 읽기를 수행하는 쿼리에 @table 적용하거나 create_streaming_table() 함수사용합니다. 두 데이터 세트 형식의 구문 사양은 다음과 같습니다.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

DLT 보기 만들기

Python에서 보기를 정의하려면 @view 데코레이터를 적용합니다. @table 데코레이터와 마찬가지로 정적 또는 스트리밍 데이터 세트에 대해 DLT의 보기를 사용할 수 있습니다. 다음은 Python을 사용하여 뷰를 정의하는 구문입니다.

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

예: 테이블 및 뷰 정의

Python에서 테이블 또는 뷰를 정의하려면 함수에 @dlt.view 또는 @dlt.table 데코레이터를 적용합니다. 함수 이름 또는 name 매개 변수를 사용하여 테이블 또는 뷰 이름을 할당할 수 있습니다. 다음 예제에서는 JSON 파일을 입력 원본으로 사용하는 taxi_raw 뷰와 filtered_data 뷰를 입력으로 사용하는 taxi_raw 테이블이라는 두 가지 데이터 세트를 정의합니다.

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

예: 동일한 파이프라인에 정의된 데이터 세트에 액세스

메모

dlt.read()dlt.read_stream() 함수는 DLT Python 인터페이스에서 계속 사용할 수 있고 완전히 지원되지만 Databricks는 다음과 같은 이유로 항상 spark.read.table()spark.readStream.table() 함수를 사용하는 것이 좋습니다.

  • spark 함수는 외부 스토리지의 데이터 세트 또는 다른 파이프라인에 정의된 데이터 세트를 포함하여 내부 및 외부 데이터 세트를 읽을 수 있습니다. dlt 함수는 내부 데이터 세트 읽기만 지원합니다.
  • spark 함수는 skipChangeCommits같은 옵션을 지정하여 작업을 읽을 수 있도록 지원합니다. 옵션 지정은 dlt 함수에서 지원되지 않습니다.

동일한 파이프라인에 정의된 데이터 세트에 액세스하려면 spark.read.table() 또는 spark.readStream.table() 함수를 사용합니다.

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

메모

파이프라인에서 뷰 또는 테이블을 쿼리할 때 카탈로그와 스키마를 직접 지정하거나 파이프라인에 구성된 기본값을 사용할 수 있습니다. 이 예제에서 customers테이블은 파이프라인에 대해 구성된 기본 카탈로그 및 스키마에서 작성 및 읽습니다.

예: 메타스토어에 등록된 테이블에서 읽기

Hive 메타스토어에 등록된 테이블에서 데이터를 읽으려면 함수 인수에서 테이블 이름을 데이터베이스 이름으로 한정할 수 있습니다.

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Unity 카탈로그 테이블에서 읽는 예제는 Unity 카탈로그 파이프라인에 데이터 수집을 참조하세요.

예제: spark.sql 사용하여 데이터 세트에 액세스

쿼리 함수에서 spark.sql 식을 사용하여 데이터 세트를 반환할 수도 있습니다. 내부 데이터 세트에서 읽으려면 기본 카탈로그와 스키마를 사용하도록 이름을 그대로 두거나, 이름 앞에 카탈로그와 스키마를 추가할 수 있습니다.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

구체화된 뷰 또는 스트리밍 테이블에서 레코드를 영구적으로 삭제

GDPR 준수와 같이 삭제 벡터가 활성화된 구체화된 뷰 또는 스트리밍 테이블에서 레코드를 영구적으로 삭제하려면 개체의 기본 델타 테이블에서 추가 작업을 수행해야 합니다. 레코드가 구체화된 뷰에서 올바르게 삭제되도록 하려면, 삭제 벡터가 활성화된 구체화된 뷰에서 레코드를 영구적으로 삭제하는 방법()을 참조하세요. 스트리밍 테이블에서 레코드를 삭제하려면 스트리밍 테이블에서 레코드를 영구적으로 삭제하는 방법을확인하세요.

DLT sink API를 사용하여 외부 이벤트 스트리밍 서비스 또는 델타 테이블에 쓰기

중요하다

DLT sink API는 공개 프리뷰입니다.

메모

  • 전체 새로 고침 업데이트 실행해도 싱크에서 데이터가 지워지지 않습니다. 다시 처리된 데이터는 싱크에 추가되고 기존 데이터는 변경되지 않습니다.
  • DLT 기대치는 sink API에서 지원되지 않습니다.

Apache Kafka 또는 Azure Event Hubs와 같은 이벤트 스트리밍 서비스 또는 DLT 파이프라인의 델타 테이블에 쓰려면 create_sink() Python 모듈에 포함된 dlt 함수를 사용합니다. create_sink() 함수를 사용하여 싱크를 만든 후, 부가 흐름을(를) 사용하여 싱크에 데이터를 씁니다. 추가 흐름은 create_sink() 함수에서 지원되는 유일한 흐름 형식입니다. apply_changes같은 다른 흐름 형식은 지원되지 않습니다.

다음은 create_sink() 함수를 사용하여 싱크를 만드는 구문입니다.

create_sink(<sink_name>, <format>, <options>)
논쟁
name
형식: str
싱크를 식별하고 싱크를 참조하고 관리하는 데 사용되는 문자열입니다. 싱크 이름은 노트북이나 모듈과 같은 모든 소스 코드를 포함하여, 전체 파이프라인에 걸쳐 고유해야 합니다.
이 매개 변수는 필수입니다.
format
형식: str
출력 형식(kafka 또는 delta)을 정의하는 문자열입니다.
이 매개 변수는 필수입니다.
options
형식: dict
{"key": "value"}형식으로 지정된 선택적 싱크 옵션 목록이며, 키와 값은 모두 문자열입니다. Kafka 및 델타 싱크에서 지원하는 모든 Databricks 런타임 옵션이 지원됩니다. Kafka 옵션은 Kafka 구조적 스트리밍 작성기구성을 참조하세요. Delta 옵션에 대한 내용은 Delta 테이블을 싱크로 참조하세요.

예제: create_sink() 함수를 사용하여 Kafka 싱크 만들기

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

예제: create_sink() 함수 및 파일 시스템 경로를 사용하여 델타 싱크 만들기

다음 예제에서는 테이블에 파일 시스템 경로를 전달하여 Delta 테이블에 쓰는 싱크를 만듭니다.

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

예제: create_sink() 함수와 Unity 카탈로그 테이블 이름을 사용하여 델타 싱크 만들기

메모

델타 싱크는 Unity 카탈로그 외부 및 관리 테이블 및 Hive 메타스토어 관리 테이블을 지원합니다. 테이블 이름은 완전하게 지정되어야 합니다. 예를 들어 Unity 카탈로그 테이블은 <catalog>.<schema>.<table>3계층 식별자를 사용해야 합니다. Hive 메타스토어 테이블은 <schema>.<table>사용해야 합니다.

다음 예제에서는 Unity 카탈로그에서 테이블의 이름을 전달하여 델타 테이블에 쓰는 싱크를 만듭니다.

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

예제: 추가 흐름을 사용하여 델타 싱크에 쓰기

다음 예제에서는 Delta 테이블에 쓰는 싱크를 만든 다음 해당 싱크에 쓸 추가 흐름을 만듭니다.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

예제: 추가 흐름을 사용하여 Kafka 싱크에 기록하기

다음 예제에서는 Kafka 토픽에 쓰는 싱크를 만든 다음 해당 싱크에 쓸 추가 흐름을 만듭니다.

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Kafka에 기록된 DataFrame의 스키마에는 Kafka 구조적 스트리밍 작성기구성에 지정된 열이 포함되어야 합니다.

스트리밍 작업의 대상으로 사용할 테이블 만들기

create_streaming_table() 함수를 사용하여 스트리밍 작업에서 출력된 apply_changes(), apply_changes_from_snapshot()@append_flow 레코드를 위한 대상 테이블을 만드세요.

메모

create_target_table()create_streaming_live_table() 함수는 더 이상 사용되지 않습니다. Databricks는 create_streaming_table() 함수를 사용하도록 기존 코드를 업데이트하는 것이 좋습니다.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
논쟁
name
형식: str
테이블 이름입니다.
이 매개 변수는 필수입니다.
comment
형식: str
테이블에 대한 선택적 설명입니다.
spark_conf
형식: dict
이 쿼리를 실행하기 위한 Spark 구성의 선택적 목록입니다.
table_properties
형식: dict
테이블을 위한 테이블 속성의 선택적 목록입니다.
partition_cols
형식: array
테이블을 분할하는 데 사용할 하나 이상의 열의 선택적 목록입니다.
cluster_by
형식: array
필요에 따라 테이블에서 액체 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다.
참조: Delta 테이블에 액체 클러스터링 사용.
path
형식: str
테이블 데이터의 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다.
schema
형식: str 또는 StructType
테이블에 대한 선택적 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python으로 정의할 수 있습니다.
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail
형식: dict
테이블에 대한 선택적 데이터 품질 제약 조건입니다. 여러 기대 사항을 참조하세요.
row_filter(공개 미리 보기)
형식: str
테이블의 행 필터 옵션입니다. 행 필터 및 열 마스크가 있는 테이블 게시를 참조하세요.

테이블 생성 방식 제어

테이블은 또한 구체화에 대한 추가적인 제어 기능을 제공합니다.

  • 를 사용하여 cluster_by 테이블을 지정하는 방법. Liquid 클러스터링을 사용하여 쿼리 속도를 높일 수 있습니다. 참조: Delta 테이블에 액체 클러스터링 사용.
  • 를 사용하여 테이블 partition_cols로 분할하는 방법을 지정합니다.
  • 뷰 또는 테이블을 정의할 때 테이블 속성을 설정할 수 있습니다. DLT 테이블 속성참조하세요.
  • path 설정을 사용하여 테이블 데이터의 스토리지 위치를 설정합니다. 기본적으로 테이블 데이터는 path 설정되지 않은 경우 파이프라인 스토리지 위치에 저장됩니다.
  • 스키마 정의에서 생성된 열 사용할 수 있습니다. 예제: 스키마 및 클러스터 열을 지정하는 방법을 참조하세요.

메모

크기가 1TB 미만인 테이블의 경우 Databricks는 DLT가 데이터 조직을 제어하도록 하는 것이 좋습니다. 테이블이 테라바이트 이상으로 증가할 것으로 예상하지 않는 한 파티션 열을 지정해서는 안 됩니다.

예제: 스키마 및 클러스터 열 지정

선택적으로 Python StructType 또는 SQL DDL 문자열을 사용하여 테이블 스키마를 지정할 수 있습니다. DDL 문자열과 함께 지정될 경우, 정의는 생성된 열 를 포함할 수 있습니다.

다음 예제에서는 Python sales사용하여 지정된 스키마를 사용하여 StructType 테이블을 만듭니다.

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

다음 예제에서는 DDL 문자열을 사용하여 테이블에 대한 스키마를 지정하고, 생성된 열을 정의하고, 클러스터링 열을 정의합니다.

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

기본적으로 스키마를 지정하지 않으면 DLT는 table 정의에서 스키마를 유추합니다.

예제: 파티션 열 지정

다음 예제에서는 DDL 문자열을 사용하여 테이블에 대한 스키마를 지정하고, 생성된 열을 정의하고, 파티션 열을 정의합니다.

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

예: 테이블 제약 조건 정의

중요하다

테이블 제약 조건은 공개 미리 보기에 있습니다.

스키마를 지정할 때 기본 키와 외장 키를 정의할 수 있습니다. 제약 조건은 정보 제공이며 적용되지 않습니다. SQL 언어 참조의 CONSTRAINT 절 참조하세요.

다음 예제에서는 기본 및 외래 키 제약 조건이 있는 테이블을 정의합니다.

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

예: 행 필터 및 열 마스크 정의

중요하다

행 필터 및 열 마스크는 공개 미리 보기에 있습니다.

행 필터 및 열 마스크를 사용하여 구체화된 뷰 또는 스트리밍 테이블을 만들려면 ROW FILTER 절MASK 절를 사용하면 됩니다. 다음 예제에서는 행 필터와 열 마스크를 모두 사용하여 구체화된 뷰와 스트리밍 테이블을 정의하는 방법을 보여 줍니다.

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

행 필터 및 열 마스크에 대한 자세한 내용은 행 필터와 열 마스크를 사용하는 테이블 게시를 참조하세요.

원본 스트리밍 테이블의 변경 내용을 무시하도록 스트리밍 테이블 구성

메모

  • skipChangeCommits 플래그는 spark.readStream 함수를 사용할 때만 option()과 함께 작동합니다. dlt.read_stream() 함수에서는 이 플래그를 사용할 수 없습니다.
  • 원본 스트리밍 테이블이 skipChangeCommits 함수의 대상으로 정의되면 플래그를 사용할 수 없습니다.

기본적으로 스트리밍 테이블에는 추가 전용 원본이 필요합니다. 스트리밍 테이블이 다른 스트리밍 테이블을 원본으로 사용하고 원본 스트리밍 테이블에 업데이트 또는 삭제가 필요한 경우(예: GDPR "잊혀질 권리" 처리) 원본 스트리밍 테이블을 읽을 때 해당 변경 내용을 무시하도록 skipChangeCommits 플래그를 설정할 수 있습니다. 이 플래그에 대한 자세한 내용은 업데이트 무시 및삭제를 참조하세요.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Python DLT 속성

다음 표에서는 DLT를 사용하여 테이블 및 뷰를 정의하는 동안 지정할 수 있는 옵션 및 속성에 대해 설명합니다.

@table 또는 @view
name
형식: str
테이블 또는 뷰의 선택적 이름입니다. 정의되지 않은 경우 함수 이름이 테이블 또는 뷰 이름으로 사용됩니다.
comment
형식: str
테이블에 대한 선택적 설명입니다.
spark_conf
형식: dict
이 쿼리를 실행하기 위한 Spark 구성의 선택적 목록입니다.
table_properties
형식: dict
테이블을 위한 테이블 속성의 선택적 목록입니다.
path
형식: str
테이블 데이터의 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다.
partition_cols
형식: a collection of str
테이블 분할에 사용할 하나 이상의 열을 포함할 수 있는 선택적 컬렉션(예: list)입니다.
cluster_by
형식: array
필요에 따라 테이블에서 액체 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다.
참조: Delta 테이블에 액체 클러스터링 사용.
schema
형식: str 또는 StructType
테이블에 대한 선택적 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python StructType사용하여 정의할 수 있습니다.
temporary
형식: bool
테이블을 만들지만 테이블에 대한 메타데이터는 게시하지 않습니다. temporary 키워드는 파이프라인에서 사용할 수 있지만 파이프라인 외부에서 액세스해서는 안 되는 테이블을 만들도록 DLT에 지시합니다. 처리 시간을 줄이기 위해 임시 테이블은 단일 업데이트가 아니라 해당 테이블을 만드는 파이프라인의 수명 동안 유지됩니다.
기본값은 'False'입니다.
row_filter(공개 미리 보기)
형식: str
테이블에 대한 선택적 행 필터 절입니다. 행 필터와 열 마스크가 있는 테이블 게시를 참조하세요.
테이블 또는 뷰 정의
def <function-name>()
데이터 세트를 정의하는 Python 함수입니다. name 매개 변수가 설정되지 않은 경우 <function-name> 대상 데이터 세트 이름으로 사용됩니다.
query
Spark 데이터 세트 또는 Koalas DataFrame을 반환하는 Spark SQL 문입니다.
dlt.read() 또는 spark.read.table() 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 전체 읽기를 수행합니다. 외부 데이터 세트를 읽으려면 spark.read.table() 함수를 사용합니다. dlt.read() 사용하여 외부 데이터 세트를 읽을 수 없습니다. spark.read.table() 사용하여 현재 파이프라인 외부에서 정의된 내부 데이터 세트, 데이터 세트를 읽을 수 있고 데이터 읽기 옵션을 지정할 수 있으므로 Databricks는 dlt.read() 함수 대신 사용하는 것이 좋습니다.
파이프라인에서 데이터 세트를 정의할 때 기본적으로 파이프라인 구성에 정의된 카탈로그 및 스키마를 사용합니다. spark.read.table() 함수를 사용하여 자격 없이 파이프라인에 정의된 데이터 세트에서 읽을 수 있습니다. 예를 들어 customers데이터 세트에서 읽으려면 다음을 수행합니다.
spark.read.table("customers")
spark.read.table() 함수를 사용하여 선택적으로 테이블 이름을 데이터베이스 이름으로 한정하여 metastore에 등록된 테이블에서 읽을 수도 있습니다.
spark.read.table("sales.customers")
dlt.read_stream() 또는 spark.readStream.table()을 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 스트리밍 읽기를 수행하십시오. 외부 데이터 세트에서 스트리밍 읽기를 수행하려면 다음을 사용하세요.
spark.readStream.table() 함수입니다. spark.readStream.table() 사용하여 현재 파이프라인 외부에서 정의된 내부 데이터 세트, 데이터 세트를 읽을 수 있고 데이터 읽기 옵션을 지정할 수 있으므로 Databricks는 dlt.read_stream() 함수 대신 사용하는 것이 좋습니다.
SQL 구문을 사용하여 DLT table 함수에서 쿼리를 정의하려면 spark.sql 함수를 사용합니다. 예제: spark.sql사용하여 데이터 세트에 액세스합니다. Python을 사용하여 DLT table 함수에서 쿼리를 정의하려면 PySpark 구문을 사용합니다.
기대
@expect("description", "constraint")
식별된 데이터 품질 제약 조건을 선언하십시오.
description. 행이 예상을 위반하는 경우 대상 데이터 세트에 행을 포함합니다.
@expect_or_drop("description", "constraint")
식별된 데이터 품질 제약 조건을 선언하십시오.
description. 행이 예상을 위반하는 경우 대상 데이터 세트에서 행을 삭제합니다.
@expect_or_fail("description", "constraint")
식별된 데이터 품질 제약 조건을 선언하십시오.
description. 행이 예상을 위반하는 경우 즉시 실행을 중지합니다.
@expect_all(expectations)
하나 이상의 데이터 품질 제약 조건을 선언합니다.
expectations Python 사전입니다. 여기서 키는 예상 설명이고 값은 예상 제약 조건입니다. 행이 예상을 위반하는 경우 대상 데이터 세트에 행을 포함합니다.
@expect_all_or_drop(expectations)
하나 이상의 데이터 품질 제약 조건을 선언합니다.
expectations Python 사전입니다. 여기서 키는 예상 설명이고 값은 예상 제약 조건입니다. 행이 예상을 위반하는 경우 대상 데이터 세트에서 행을 삭제합니다.
@expect_all_or_fail(expectations)
하나 이상의 데이터 품질 제약 조건을 선언합니다.
expectations Python 사전입니다. 여기서 키는 예상 설명이고 값은 예상 제약 조건입니다. 행이 예상을 위반하는 경우 즉시 실행을 중지합니다.

DLT에서 Python을 사용하여 변경 피드에서 데이터 캡처 변경

Python API의 apply_changes() 함수를 사용하여 DLT CDC(변경 데이터 캡처) 기능을 사용하여 CDF(변경 데이터 피드)에서 원본 데이터를 처리합니다.

중요하다

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes() 대상 테이블의 스키마를 지정할 때는 __START_AT__END_AT 열을 sequence_by 필드와 동일한 데이터 형식으로 포함해야 합니다.

필요한 대상 테이블을 만들려면 DLT Python 인터페이스에서 create_streaming_table() 함수를 사용할 수 있습니다.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

메모

APPLY CHANGES 처리 시, INSERTUPDATE 이벤트에 대한 기본 동작은 원본에서 CDC 이벤트를하여 다음을 수행하는 것입니다: 지정한 키와 일치하는 대상 테이블의 행을 업데이트하거나, 일치하는 레코드가 없을 경우 대상 테이블에 새 행을 삽입합니다. DELETE 이벤트에 대한 처리는 APPLY AS DELETE WHEN 조건으로 지정할 수 있습니다.

변경 피드를 사용한 CDC 처리에 대한 자세한 내용은 변경 내용 적용 API: DLT사용하여 변경 데이터 캡처 간소화를 참조하세요. apply_changes() 함수를 사용하는 예제는 예제: CDF 원본 데이터에서는 SCD 형식 1 및 SCD 형식 2 처리를 참조하세요.

중요하다

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes 대상 테이블 스키마를 지정할 때는 __START_AT 필드와 데이터 형식이 같은 __END_ATsequence_by 열을 포함해야 합니다.

변경 사항 적용 API 참조: DLT사용하여 변경 데이터 캡처 간소화.

논쟁
target
형식: str
업데이트할 테이블의 이름입니다. create_streaming_table() 함수를 사용하여 apply_changes() 함수를 실행하기 전에 대상 테이블을 만들 수 있습니다.
이 매개 변수는 필수입니다.
source
형식: str
CDC 레코드를 포함하는 데이터 원본입니다.
이 매개 변수는 필수입니다.
keys
형식: list
원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 이는 대상 테이블의 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다.
다음 중 하나를 지정할 수 있습니다.
  • 문자열 목록: ["userId", "orderId"]
  • Spark SQL col() 함수 목록: [col("userId"), col("orderId"]

col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어 col(userId)사용할 수 있지만 col(source.userId)사용할 수는 없습니다.
이 매개 변수는 필수입니다.
sequence_by
형식: str 또는 col()
원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. DLT는 이 시퀀싱을 사용하여 순서가 잘못 도착하는 변경 이벤트를 처리합니다.
다음 중 하나를 지정할 수 있습니다.
  • 문자열: "sequenceNum"
  • Spark SQL col() 함수: col("sequenceNum")

col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어 col(userId)사용할 수 있지만 col(source.userId)사용할 수는 없습니다.
지정된 열은 정렬 가능한 데이터 형식이어야 합니다.
이 매개 변수는 필수입니다.
ignore_null_updates
형식: bool
대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치한 경우, ignore_null_updatesTrue일 때 null 값을 가진 열은 대상에 기존의 값을 유지합니다. 중첩된 열로, 값이 null인 경우에도 적용됩니다. ignore_null_updatesFalse일 때, 기존 값은 null로 덮어씁니다.
이 매개 변수는 선택 사항입니다.
기본값은 False.
apply_as_deletes
형식: str 또는 expr()
CDC 이벤트를 업서트가 아닌 DELETE로 처리해야 할 시기를 지정합니다. 정렬이 맞지 않는 데이터를 처리하기 위해 삭제된 행은 기본 델타 테이블에서 임시로 '툼스톤'으로 유지되고 메타스토어에서 이러한 툼스톤을 필터링하는 뷰가 만들어집니다. 보존 간격은 다음을 사용하여 구성할 수 있습니다.
pipelines.cdc.tombstoneGCThresholdInSeconds 테이블 속성입니다.
다음 중 하나를 지정할 수 있습니다.
  • 문자열: "Operation = 'DELETE'"
  • Spark SQL expr() 함수: expr("Operation = 'DELETE'")

이 매개 변수는 선택 사항입니다.
apply_as_truncates
형식: str 또는 expr()
CDC 이벤트를 테이블 TRUNCATE전체로 처리해야 하는 시기를 지정합니다. 이 절은 대상 테이블의 전체 잘림을 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다.
apply_as_truncates 매개 변수는 SCD 형식 1에 대해서만 지원됩니다. SCD 유형 2는 자르기 작업을 지원하지 않습니다.
다음 중 하나를 지정할 수 있습니다.
  • 문자열: "Operation = 'TRUNCATE'"
  • Spark SQL expr() 함수: expr("Operation = 'TRUNCATE'")

이 매개 변수는 선택 사항입니다.
column_list
except_column_list
형식: list
대상 테이블에 포함할 열의 하위 집합입니다. column_list 사용하여 포함할 열의 전체 목록을 지정합니다. except_column_list 사용하여 제외할 열을 지정합니다. 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
  • column_list = ["userId", "name", "city"].
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어 col(userId)사용할 수 있지만 col(source.userId)사용할 수는 없습니다.
이 매개 변수는 선택 사항입니다.
기본값은 column_list 또는 except_column_list 인수가 함수에 전달되지 않는 경우 대상 테이블에 모든 열을 포함하는 것입니다.
stored_as_scd_type
형식: str 또는 int
레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부입니다.
SCD 유형 1의 경우 1, SCD 형식 2의 경우 2 설정합니다.
이 절은 선택 사항입니다.
기본값은 SCD 형식 1입니다.
track_history_column_list
track_history_except_column_list
형식: list
대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. track_history_column_list 사용하여 추적할 열의 전체 목록을 지정합니다. 쓰다
track_history_except_column_list 추적으로부터 제외할 열을 지정합니다. 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어 col(userId)사용할 수 있지만 col(source.userId)사용할 수는 없습니다.
이 매개 변수는 선택 사항입니다.
기본값은 track_history_column_list 없는 경우 대상 테이블에 모든 열을 포함하는 것입니다.
track_history_except_column_list 인수가 함수에 전달됩니다.

DLT에서 Python을 사용하여 데이터베이스 스냅샷에서 데이터 캡처 변경

중요하다

APPLY CHANGES FROM SNAPSHOT API는 공개 프리뷰입니다.

Python API의 apply_changes_from_snapshot() 함수를 사용하여 DLT CDC(변경 데이터 캡처) 기능을 사용하여 데이터베이스 스냅샷에서 원본 데이터를 처리합니다.

중요하다

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes_from_snapshot() 대상 테이블의 스키마를 지정할 때는 __START_AT 필드와 데이터 형식이 같은 __END_ATsequence_by 열도 포함해야 합니다.

필요한 대상 테이블을 만들려면 DLT Python 인터페이스에서 create_streaming_table() 함수를 사용할 수 있습니다.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

메모

APPLY CHANGES FROM SNAPSHOT 처리의 경우 동일한 키를 가진 일치하는 레코드가 대상에 없는 경우 새 행을 삽입하는 것이 기본 동작입니다. 일치하는 레코드가 있는 경우 행의 값이 변경된 경우에만 업데이트됩니다. 대상에 키가 있지만 더 이상 원본에 없는 행이 삭제됩니다.

스냅샷을 사용한 CDC 처리에 대한 자세한 내용은 변경 사항 적용 API: DLT사용하여 변경 데이터 캡처 간소화를 참조하세요. apply_changes_from_snapshot() 함수를 사용하는 예제는 정기적인 스냅샷 수집기록 스냅샷 수집 예제를 참조하세요.

논쟁
target
형식: str
업데이트할 테이블의 이름입니다. 함수를 실행하기 전에 apply_changes() 함수를 사용하여 대상 테이블을 만들 수 있습니다.
이 매개 변수는 필수입니다.
source
형식: str 또는 lambda function
주기적으로 스냅샷할 테이블 또는 뷰의 이름 또는 처리할 스냅샷 DataFrame 및 스냅샷 버전을 반환하는 Python 람다 함수입니다. source 인수구현을 참조하세요.
이 매개 변수는 필수입니다.
keys
형식: list
원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 이는 대상 테이블의 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다.
다음 중 하나를 지정할 수 있습니다.
  • 문자열 목록: ["userId", "orderId"]
  • Spark SQL col() 함수 목록: [col("userId"), col("orderId"]

col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어 col(userId)사용할 수 있지만 col(source.userId)사용할 수는 없습니다.
이 매개 변수는 필수입니다.
stored_as_scd_type
형식: str 또는 int
레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부입니다.
SCD 유형 1의 경우 1, SCD 형식 2의 경우 2 설정합니다.
이 절은 선택 사항입니다.
기본값은 SCD 형식 1입니다.
track_history_column_list
track_history_except_column_list
형식: list
대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. track_history_column_list 사용하여 추적할 열의 전체 목록을 지정합니다. 사용하다
track_history_except_column_list 추적으로부터 제외할 열을 지정합니다. 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어 col(userId)사용할 수 있지만 col(source.userId)사용할 수는 없습니다.
이 매개 변수는 선택 사항입니다.
기본값은 track_history_column_list 없는 경우 대상 테이블에 모든 열을 포함하는 것입니다.
track_history_except_column_list 인수가 함수에 전달됩니다.

source 인수 구현

apply_changes_from_snapshot() 함수에는 source 인수가 포함됩니다. 기록 스냅샷을 처리하기 위해 source 인수는 처리할 스냅샷 데이터와 스냅샷 버전을 포함하는 Python DataFrame이라는 두 값을 apply_changes_from_snapshot() 함수에 반환하는 Python 람다 함수여야 합니다.

다음은 람다 함수의 서명입니다.

lambda Any => Optional[(DataFrame, Any)]
  • 람다 함수에 대한 인수는 가장 최근에 처리된 스냅샷 버전입니다.
  • 람다 함수의 반환 값은 None 또는 두 값의 튜플입니다. 튜플의 첫 번째 값은 처리할 스냅샷을 포함하는 DataFrame입니다. 튜플의 두 번째 값은 스냅샷의 논리적 순서를 나타내는 스냅샷 버전입니다.

람다 함수를 구현하고 호출하는 예제:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

DLT 런타임은 apply_changes_from_snapshot() 함수가 포함된 파이프라인이 트리거될 때마다 다음 단계를 수행합니다.

  1. next_snapshot_and_version 함수를 실행하여 다음 스냅샷 DataFrame 및 해당 스냅샷 버전을 로드합니다.
  2. DataFrame이 반환되지 않으면 실행이 종료되고 파이프라인 업데이트가 완료된 것으로 표시됩니다.
  3. 새 스냅샷의 변경 내용을 검색하고 증분 방식으로 대상 테이블에 적용합니다.
  4. 1단계로 돌아와서 다음 스냅샷 및 해당 버전을 로드합니다.

제한 사항

DLT Python 인터페이스에는 다음과 같은 제한 사항이 있습니다.

pivot() 함수는 지원되지 않습니다. Spark에서 pivot 작업을 수행하려면 출력 스키마를 계산하기 위해 입력 데이터를 즉시 로드해야 합니다. 이 기능은 DLT에서 지원되지 않습니다.