Поделиться через


Справочник по языку Python DLT

В этой статье содержатся сведения о интерфейсе программирования DLT Python.

Дополнительные сведения об API SQL см. в справочнике по языку DLT SQL.

Дополнительные сведения о настройке автозагрузчика см. в разделе Что такое автозагрузчик?.

перед началом работы

Ниже приведены важные рекомендации при реализации конвейеров с помощью интерфейса Python DLT:

  • Так как функции Python table() и view() вызываются несколько раз во время планирования и выполнения обновления конвейера, не включают код в одну из этих функций, которые могут иметь побочные эффекты (например, код, который изменяет данные или отправляет сообщение электронной почты). Чтобы избежать непредвиденного поведения, функции Python, определяющие наборы данных, должны содержать только код, необходимый для определения таблицы или представления.
  • Для выполнения таких операций, как отправка сообщений электронной почты или интеграция с внешней службой мониторинга, особенно в функциях, определяющих наборы данных, используйте перехватчики событий. Реализация этих операций в функциях, определяющих наборы данных, приведет к неожиданному поведению.
  • Функции Python table и view должны возвращать кадр данных. Некоторые функции, работающие с дейтафреймами, не возвращают дейтафреймы и не должны использоваться. Эти операции включают такие функции, как collect(), count(), toPandas(), save()и saveAsTable(). Поскольку преобразования DataFrame выполняются после того, как полный граф потока данных был разрешён, использование таких операций может приводить к непреднамеренным побочным эффектам.

Импорт модуля Python dlt

Функции Python DLT определяются в модуле dlt. Конвейеры, реализованные с помощью API Python, должны импортировать этот модуль:

import dlt

создание материализованного представления или потоковой таблицы DLT

В Python DLT определяет, следует ли обновлять набор данных в виде материализованного представления или таблицы потоковой передачи на основе определяющего запроса. Декоратор @table можно использовать для определения как материализованных представлений, так и потоковых таблиц.

Чтобы определить материализованное представление в Python, примените @table к запросу, который выполняет статическое чтение к источнику данных. Чтобы определить потоковую таблицу, примените @table к запросу, который выполняет потоковое чтение из источника данных или используйте функцию create_streaming_table(). Оба типа набора данных имеют одну и ту же спецификацию синтаксиса, как показано ниже.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

создание представления DLT

Чтобы определить представление в Python, примените декоратор @view. Как и декоратор @table, можно использовать представления в DLT для статических или потоковых наборов данных. Ниже приведен синтаксис для определения представлений с помощью Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Пример. Определение таблиц и представлений

Чтобы определить таблицу или представление в Python, примените декоратор @dlt.view или @dlt.table к функции. Имя функции или параметр name можно использовать для назначения имени таблицы или представления. В следующем примере определяются два разных набора данных: представление с именем taxi_raw, которое принимает JSON-файл в качестве источника входных данных и таблицу с именем filtered_data, которая принимает представление taxi_raw в качестве входных данных:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Пример. Доступ к набору данных, определенному в том же конвейере

Заметка

Хотя функции dlt.read() и dlt.read_stream() по-прежнему доступны и полностью поддерживаются интерфейсом Python DLT, Databricks рекомендует всегда использовать функции spark.read.table() и spark.readStream.table() из-за следующих действий:

  • Функции spark поддерживают чтение внутренних и внешних наборов данных, включая наборы данных во внешнем хранилище или определенные в других конвейерах. Функции dlt поддерживают только чтение внутренних наборов данных.
  • Функции spark поддерживают указание параметров, таких как skipChangeCommits, для выполнения операций чтения. Указание параметров не поддерживается функциями dlt.

Чтобы получить доступ к набору данных, определенному в том же конвейере, используйте функции spark.read.table() или spark.readStream.table():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Заметка

При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблица customersзаписывается и считывается из каталога по умолчанию и схемы, настроенной для конвейера.

Пример. Чтение из таблицы, зарегистрированной в хранилище метаданных

Чтобы считывать данные из таблицы, зарегистрированной в хранилище метаданных Hive, в аргументе функции можно указать имя таблицы с именем базы данных:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Пример чтения из таблицы каталога Unity см. в разделе прием данных в конвейер каталога Unity.

пример . Доступ к набору данных с помощью spark.sql

Вы также можете вернуть набор данных с помощью выражения spark.sql в функции запроса. Чтобы прочитать из внутреннего набора данных, можно оставить имя без уточнения для использования каталога и схемы по умолчанию, или указать их заранее.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Окончательное удаление записей из материализованного представления или потоковой таблицы

Чтобы окончательно удалить записи из материализованного представления или потоковой таблицы с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы обеспечить перманентное удаление записей из материализованного представления, см. удаление записей из материализованного представления с активными векторами удаления. Чтобы гарантировать удаление записей из потоковой таблицы, см. постоянное удаление записей из потоковой таблицы.

Записывайте во внешние службы потоковой передачи событий или таблицы Delta с помощью DLT API sink

Важный

API sink DLT находится в общедоступной предварительной версии.

Заметка

  • Запуск полного обновления не очищает данные из хранилищ. Все повторно обработанные данные будут добавлены в приемник, и существующие данные не будут изменены.
  • Ожидания DLT не поддерживаются в API sink.

Чтобы записать в стриминговую службу, такую как Apache Kafka или Центры событий Azure, или в таблицу Delta из конвейера DLT, используйте функцию create_sink(), включенную в модуль Python dlt. После создания приемника с функцией create_sink() используется приемник в потоке добавления для записи данных в приемник. Единственный тип потока, поддерживаемый функцией create_sink(), — это поток добавления. Другие типы потоков, например apply_changes, не поддерживаются.

Ниже приведен синтаксис для создания приемника с функцией create_sink():

create_sink(<sink_name>, <format>, <options>)
Аргументы
name
Тип: str
Строка, идентифицирующая приемник и используемая для ссылки на него и управления им. Имена приемников должны быть уникальными для конвейера, включая весь исходный код, например записные книжки или модули, которые являются частью конвейера.
Этот параметр является обязательным.
format
Тип: str
Строка, определяющая выходной формат либо kafka, либо delta.
Этот параметр является обязательным.
options
Тип: dict
Необязательный список параметров приемника, отформатированный в виде {"key": "value"}, где и значение, и ключ являются строками. Поддерживаются все параметры среды выполнения Databricks, поддерживаемые приемниками Kafka и Delta. См. параметры Kafka в разделе Конфигурация модуля записи структурированной потоковой передачи Kafka. Параметры Delta см. в таблице Delta в качестве приемника.

Пример: создание приемника Kafka с помощью функции create_sink()

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Пример: Создание приемника Delta с помощью функции create_sink() и пути к файловой системе

В следующем примере создается загрузка, которая записывает данные в таблицу Delta, используя путь файловой системы к таблице.

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

Пример: Создание приемника Delta с помощью функции create_sink() и имени таблицы каталога Unity Catalog

Заметка

Приемник Delta поддерживает внешние и управляемые таблицы каталога Unity и управляемые таблицы хранилища метаданных Hive. Имена таблиц должны быть полностью квалифицированы. Например, таблицы каталога Unity должны использовать трехуровневый идентификатор: <catalog>.<schema>.<table>. Таблицы хранилища метаданных Hive должны использовать <schema>.<table>.

В следующем примере создается приемник, который записывает данные в одну из Delta-таблиц, передавая имя таблицы в каталоге Unity.

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

Пример: Используйте поток добавления для записи в хранилище Delta

В следующем примере создается приемник, который записывается в таблицу Delta, а затем создает поток добавления для записи в этот приемник:

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

Пример: использование потока добавления для записи в хранилище Kafka

В следующем примере создается приемник, который записывается в раздел Kafka, а затем создает поток добавления для записи в этот приемник:

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Схема фрейма данных, записанного в Kafka, должна содержать столбцы, указанные в настройках записи структурированной потоковой передачи Kafka.

Создание таблицы для использования в качестве целевой цели операций потоковой передачи

Используйте функцию create_streaming_table() для создания целевой таблицы для выходных записей потоковых операций, включая apply_changes(), apply_changes_from_snapshot()и @append_flow.

Заметка

Функции create_target_table() и create_streaming_live_table() устарели. Databricks рекомендует обновить существующий код для использования функции create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Аргументы
name
Тип: str
Имя таблицы.
Этот параметр является обязательным.
comment
Тип: str
Необязательное описание таблицы.
spark_conf
Тип: dict
Необязательный список конфигураций Spark для выполнения этого запроса.
table_properties
Тип: dict
Необязательный список свойств таблицы.
partition_cols
Тип: array
Необязательный список одного или нескольких столбцов, используемых для секционирования таблицы.
cluster_by
Тип: array
При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации.
См. Использование кластеризации жидкости для таблиц Delta.
path
Тип: str
Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию использует место хранения данных конвейера.
schema
Тип: str или StructType
Необязательное определение схемы для таблицы. Схемы можно определить как строку DDL SQL или с помощью Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail
Тип: dict
Необязательные ограничения качества данных для таблицы. См. множество ожиданий.
row_filter (общедоступная предварительная версия)
Тип: str
Необязательная клаузула фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов.

Управление материализацией таблиц

Таблицы также предлагают дополнительный контроль над их материализацией:

Заметка

Для таблиц размером меньше 1 ТБ, Databricks рекомендует доверять управление организацией данных DLT. Не следует указывать столбцы разбиения, если вы не ожидаете, что таблица увеличится и превысит терабайт.

Пример : Укажите схему и столбцы кластера

При необходимости можно указать схему таблицы с помощью StructType Python или строки DDL SQL. При указании строки DDL определение может включать созданные столбцы.

В следующем примере создается таблица с именем sales со схемой, указанной с помощью StructTypePython:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

В следующем примере задается схема таблицы с использованием строки DDL, определяется созданный столбец и определяются столбцы кластеризации.

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

По умолчанию DLT определяет схему из определения table, если не указать схему.

Пример: указание столбцов секций

В следующем примере с помощью строки DDL задается схема таблицы, определяется сгенерированный столбец и задается столбец секционирования:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Пример. Определение ограничений таблицы

Важный

Ограничения таблиц находятся в общедоступной предварительной версии.

При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. пункт CONSTRAINT в справочной информации по языку SQL.

В следующем примере определяется таблица с ограничением первичного и внешнего ключа:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Пример. Определение фильтра строк и маски столбцов

Важный

Фильтры строк и маски столбцов находятся в общедоступной предварительной версии.

Чтобы создать материализованное представление или таблицу потоковой передачи с фильтром строк и маской столбцов, используйте предикат ROW FILTER и предикат MASK . В следующем примере показано, как определить материализованное представление и потоковую таблицу с фильтрацией строк и маскированием столбцов.

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Дополнительные сведения о фильтрах строк и масках столбцов см. в разделе о публикации таблиц с фильтрами строк и масками столбцов.

Настройка потоковой таблицы для пропуска изменений в исходной потоковой таблице

Заметка

  • Флаг skipChangeCommits работает только с spark.readStream с помощью функции option(). Этот флаг нельзя использовать в функции dlt.read_stream().
  • Флаг skipChangeCommits нельзя использовать, если исходная таблица потоковой передачи определена в качестве цели функции apply_changes().

По умолчанию для потоковых таблиц требуются источники, которые предназначены для добавления. Если потоковая таблица использует другую потоковую таблицу в качестве источника, и для исходной потоковой таблицы требуются обновления или удаления, например, для обработки в соответствии с GDPR «право на забвение», можно установить флаг skipChangeCommits при чтении исходной потоковой таблицы, чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе Игнорировать обновления и удалять.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Свойства DLT для Python

В следующих таблицах описаны параметры и свойства, которые можно указать при определении таблиц и представлений с помощью DLT:

@table или @view
name
Тип: str
Необязательное имя таблицы или представления. Если оно не определено, имя функции используется в качестве имени таблицы или представления.
comment
Тип: str
Необязательное описание таблицы.
spark_conf
Тип: dict
Необязательный список конфигураций Spark для выполнения этого запроса.
table_properties
Тип: dict
Необязательный список свойств таблицы .
path
Тип: str
Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию использует расположение конвейерного хранилища.
partition_cols
Тип: a collection of str
Необязательная коллекция, например, list, которая может содержать один или несколько столбцов для секционирования таблицы.
cluster_by
Тип: array
При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации.
См. Использование кластеризации жидкости для таблиц Delta.
schema
Тип: str или StructType
Необязательное определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType.
temporary
Тип: bool
Создайте таблицу, но не публикуйте метаданные для таблицы. Ключевое слово temporary указывает DLT создать таблицу, доступную для конвейера, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.
Значение по умолчанию — False.
row_filter (общедоступная предварительная версия)
Тип: str
Необязательная клаузула фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов.
Определение таблицы или представления
def <function-name>()
Функция Python, определяющая набор данных. Если параметр name не задан, <function-name> используется в качестве имени целевого набора данных.
query
Инструкция Spark SQL, возвращающая набор данных Spark или кадр данных Koalas.
Используйте dlt.read() или spark.read.table() для выполнения полного чтения из набора данных, определенного в том же потоке данных. Чтобы прочитать внешний набор данных, используйте функцию spark.read.table(). Нельзя использовать dlt.read() для чтения внешних наборов данных. Так как spark.read.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо функции dlt.read().
При определении набора данных в конвейере по умолчанию он будет использовать каталог и схему, определенную в конфигурации конвейера. Функцию spark.read.table() можно использовать для чтения из набора данных, определенного в конвейере, без уточнения. Например, для чтения из набора данных с именем customers:
spark.read.table("customers")
Вы также можете использовать функцию spark.read.table() для чтения из таблицы, зарегистрированной в хранилище метаданных, при необходимости указав имя таблицы с именем базы данных:
spark.read.table("sales.customers")
Используйте dlt.read_stream() или spark.readStream.table() для выполнения потокового чтения из набора данных, определенного в том же конвейере. Для выполнения потокового чтения из внешнего набора данных используйте
функция spark.readStream.table(). Так как spark.readStream.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо функции dlt.read_stream().
Чтобы определить запрос в функции DLT table с помощью синтаксиса SQL, используйте функцию spark.sql. См. пример. Доступ к набору данных с помощью spark.sql. Чтобы определить запрос в функции DLT table с помощью Python, используйте синтаксис PySpark.
Ожидания
@expect("description", "constraint")
Объявите ограничение качества данных, определенное по
description. Если строка нарушает ожидание, включите строку в целевой набор данных.
@expect_or_drop("description", "constraint")
Объявите ограничение качества данных, определенное по
description. Если строка нарушает ожидание, удалите строку из целевого набора данных.
@expect_or_fail("description", "constraint")
Объявите ограничение качества данных, определенное по
description. Если строка нарушает ожидание, немедленно остановите выполнение.
@expect_all(expectations)
Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, включите строку в целевой набор данных.
@expect_all_or_drop(expectations)
Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, удалите строку из целевого набора данных.
@expect_all_or_fail(expectations)
Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, немедленно остановите выполнение.

запись измененных данных из канала изменений с помощью Python в DLT

Используйте функцию apply_changes() в API Python, чтобы использовать функционал фиксации изменений данных (CDC) для обработки исходных данных из канала передачи изменений данных (CDF).

Важный

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes() необходимо включить столбцы __START_AT и __END_AT с тем же типом данных, что и поля sequence_by.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python DLT.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Заметка

Для обработки APPLY CHANGES стандартное поведение для событий INSERT и UPDATE заключается в обновлении или вставке событий CDC из источника: обновляются все строки в целевой таблице, соответствующие указанным ключам, или вставляется новая строка, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE можно указать с условием APPLY AS DELETE WHEN.

Дополнительные сведения об обработке CDC с помощью канала изменений см. в статье API APPLY CHANGES: упрощение отслеживания измененных данных с помощью DLT. Пример использования функции apply_changes() см. в разделе Пример: Обработка SCD типа 1 и SCD типа 2 с исходными данными CDF.

Важный

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes необходимо включить столбцы __START_AT и __END_AT с тем же типом данных, что и поле sequence_by.

См. Интерфейсы API APPLY CHANGES: Упрощение фиксации данных изменений с помощью DLT.

Аргументы
target
Тип: str
Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table(), чтобы создать целевую таблицу перед выполнением функции apply_changes().
Этот параметр является обязательным.
source
Тип: str
Источник данных, содержащий записи CDC.
Этот параметр является обязательным.
keys
Тип: list
Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице.
Можно указать любой из следующих вариантов:
  • Список строк: ["userId", "orderId"]
  • Список функций col() Spark SQL: [col("userId"), col("orderId"]

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.
Этот параметр является обязательным.
sequence_by
Тип: str или col()
Имя столбца, указывающее логический порядок событий CDC в исходных данных. DLT использует эту последовательность для обработки событий изменений, поступающих в неправильном порядке.
Можно указать любой из следующих вариантов:
  • Строка: "sequenceNum"
  • Функция Spark SQL col(): col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.
Указанный столбец должен быть сортируемым типом данных.
Этот параметр является обязательным.
ignore_null_updates
Тип: bool
Разрешить принимать обновления, содержащие подмножество целевых столбцов. Если событие CDC соответствует существующей строке и ignore_null_updates равно True, столбцы с null сохраняют свои значения в целевом объекте. Это также относится к вложенным столбцам со значением null. Если ignore_null_updates равно False, существующие значения перезаписываются со значениями null.
Этот параметр является необязательным.
Значение по умолчанию — False.
apply_as_deletes
Тип: str или expr()
Указывает, когда событие CDC должно рассматриваться как DELETE, а не upsert. Чтобы обрабатывать данные вне порядка, удаленная строка временно сохраняется в виде могилы в базовой таблице Delta, а представление создается в хранилище метаданных, которое фильтрует эти могилы. Интервал хранения можно настроить с помощью
pipelines.cdc.tombstoneGCThresholdInSeconds свойстве таблицы.
Можно указать любой из следующих вариантов:
  • Строка: "Operation = 'DELETE'"
  • Функция expr() Spark SQL: expr("Operation = 'DELETE'")

Этот параметр является необязательным.
apply_as_truncates
Тип: str или expr()
Указывает, когда событие CDC следует рассматривать как полную таблицу TRUNCATE. Так как это предложение активирует полное удаление данных целевой таблицы, его желательно использовать только для конкретных случаев, требующих этого функционала.
Параметр apply_as_truncates поддерживается только для SCD типа 1. Тип SCD 2 не поддерживает операции усечения данных.
Можно указать любой из следующих вариантов:
  • Строка: "Operation = 'TRUNCATE'"
  • В Spark SQL функция expr(): expr("Operation = 'TRUNCATE'")

Этот параметр является необязательным.
column_list
except_column_list
Тип: list
Подмножество столбцов для включения в целевую таблицу. Используйте column_list, чтобы указать полный список столбцов для включения. Используйте except_column_list, чтобы указать столбцы для исключения. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:
  • column_list = ["userId", "name", "city"].
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.
Этот параметр является необязательным.
Значение по умолчанию — включать все столбцы в целевую таблицу, если аргумент column_list или except_column_list не передается функции.
stored_as_scd_type
Тип: str или int
Следует ли хранить записи как SCD типа 1 или SCD типа 2.
Установите значение 1 для SCD типа 1 или 2 для SCD типа 2.
Это предложение является необязательным.
Значение по умолчанию — тип SCD 1.
track_history_column_list
track_history_except_column_list
Тип: list
Подмножество столбцов выходных данных для исторического отслеживания в целевой таблице. Используйте track_history_column_list, чтобы указать полный список столбцов для отслеживания. Использование
track_history_except_column_list, чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.
Этот параметр является необязательным.
Значение по умолчанию — включать все столбцы в целевую таблицу, если не track_history_column_list или
track_history_except_column_list аргумент передается функции.

Сбор изменений данных из моментальных снимков базы данных с помощью Python в DLT

Важный

API APPLY CHANGES FROM SNAPSHOT находится в общедоступной предварительной версии.

Используйте функцию apply_changes_from_snapshot() в API Python для использования функции фиксации изменений данных DLT (CDC) с целью обработки исходных данных из снимков базы данных.

Важный

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes_from_snapshot() необходимо также включить столбцы __START_AT и __END_AT с тем же типом данных, что и поле sequence_by.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python DLT.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Заметка

Для обработки APPLY CHANGES FROM SNAPSHOT поведение по умолчанию заключается в том, чтобы вставить новую строку, если соответствующая запись с теми же ключами не существует в целевом объекте. Если соответствующая запись существует, она обновляется только в том случае, если все значения в строке изменились. Строки с ключами, которые присутствуют в целевом объекте, но больше не значатся в источнике, удаляются.

Для получения дополнительной информации о процессе CDC с использованием моментальных снимков см. раздел API Apply Changes: упрощение отслеживания изменений данных с помощью DLT. Примеры использования функции apply_changes_from_snapshot() смотрите в примере периодического приема моментальных снимков и примере приема исторических моментальных снимков.

Аргументы
target
Тип: str
Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table() для создания целевой таблицы перед запуском функции apply_changes().
Этот параметр является обязательным.
source
Тип: str или lambda function
Либо имя таблицы или представления для создания периодических моментальных снимков, либо лямбда-функция на Python, которая возвращает DataFrame моментального снимка для последующей обработки и версию моментального снимка. См. Реализация аргумента source
Этот параметр является обязательным.
keys
Тип: list
Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице.
Можно указать любой из следующих вариантов:
  • Список строк: ["userId", "orderId"]
  • Список функций col() Spark SQL: [col("userId"), col("orderId"]

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.
Этот параметр является обязательным.
stored_as_scd_type
Тип: str или int
Следует ли хранить записи как SCD типа 1 или SCD типа 2.
Установите значение 1 для SCD типа 1 или 2 для SCD типа 2.
Это предложение является необязательным.
Значение по умолчанию — тип SCD 1.
track_history_column_list
track_history_except_column_list
Тип: list
Подмножество столбцов выходных данных, которые необходимо отслеживать для ведения истории в целевой таблице. Используйте track_history_column_list, чтобы указать полный список столбцов для отслеживания. Использование
track_history_except_column_list, чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.
Этот параметр является необязательным.
Значение по умолчанию — это включать все столбцы в целевую таблицу, если не указаны track_history_column_list или другие условия.
track_history_except_column_list аргумент передается функции.

Реализация аргумента source

Функция apply_changes_from_snapshot() включает аргумент source. Для обработки исторических моментальных снимков аргумент source, как ожидается, будет лямбда-функцией Python, которая возвращает два значения функции apply_changes_from_snapshot(): DataFrame Python, содержащий данные моментального снимка для обработки, и версию моментального снимка.

Ниже приведена подпись лямбда-функции:

lambda Any => Optional[(DataFrame, Any)]
  • Аргумент лямбда-функции является недавно обработанной версией моментального снимка.
  • Возвращаемое значение лямбда-функции — None или кортеж из двух значений: первое значение кортежа — это DataFrame, содержащий снимок данных для обработки. Вторым значением кортежа является версия моментального снимка, описывающая его логический порядок.

Пример, реализующий и вызывающий лямбда-функцию:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Среда выполнения DLT выполняет следующие действия при каждом запуске конвейера, содержащего функцию apply_changes_from_snapshot():

  1. Запускает функцию next_snapshot_and_version для загрузки следующего моментального снимка DataFrame и соответствующей версии этого снимка.
  2. Если DataFrame не возвращается, выполнение завершается, и обновление конвейера помечается как завершённое.
  3. Обнаруживает изменения в новом моментальном снимке и пошагово применяет их к целевой таблице.
  4. Возвращается к первому шагу, чтобы загрузить следующий снапшот и его версию.

Ограничения

Интерфейс Python DLT имеет следующее ограничение:

Функция pivot() не поддерживается. Операция pivot в Spark требует активной загрузки входных данных для вычисления выходной схемы. Эта возможность не поддерживается в DLT.