Carregue e processe dados incrementalmente com fluxos DLT
Este artigo explica o que são fluxos e como você pode usar fluxos em pipelines DLT para processar dados incrementalmente de uma fonte para uma tabela de streaming de destino. Nas DLT, os fluxos são definidos de duas formas:
- Um fluxo é definido automaticamente quando você cria uma consulta que atualiza uma tabela de streaming.
- A DLT também fornece funcionalidades para definir explicitamente fluxos destinados a um processamento mais complexo, como a anexação a uma tabela de streaming a partir de várias fontes de streaming.
Este artigo discute os fluxos implícitos que são criados quando você define uma consulta para atualizar uma tabela de streaming e, em seguida, fornece detalhes sobre a sintaxe para definir fluxos mais complexos.
O que é um fluxo?
Na DLT, um fluxo de consulta é um streaming que processa dados da fonte incrementalmente para atualizar uma tabela de streaming de destino. A maioria dos conjuntos de dados DLT criados em um pipeline define o fluxo como parte da consulta e não requer a definição explícita do fluxo. Por exemplo, você cria uma tabela de streaming em DLT em um único comando DDL em vez de usar instruções de tabela e fluxo separadas para criar a tabela de streaming:
Observação
Este exemplo CREATE FLOW
é fornecido apenas para fins ilustrativos e inclui palavras-chave que não são sintaxe DLT válida.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Além do fluxo padrão definido por uma consulta, as interfaces DLT Python e SQL fornecem fluxo de acréscimo funcionalidade. O fluxo de anexação suporta processamento que requer dados de várias fontes de streaming para atualizar uma única tabela de streaming. Por exemplo, você pode usar a funcionalidade de fluxo de acréscimo quando tiver uma tabela e um fluxo de streaming existentes e quiser adicionar uma nova fonte de streaming que grave nessa tabela de streaming existente.
Use o fluxo de acréscimo para gravar em uma tabela de streaming a partir de vários fluxos de origem
Use o decorador @append_flow
na interface Python ou a cláusula CREATE FLOW
na interface SQL para gravar numa tabela de streaming de várias fontes de streaming. Utilize o fluxo de adição para tarefas de processamento como as seguintes:
- Adicione fontes de streaming que acrescentam dados a uma tabela de streaming existente sem exigir uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que opera. À medida que novas regiões são distribuídas, você pode adicionar os novos dados de região à tabela sem executar uma atualização completa. Consulte Exemplo: Gravar em uma tabela em fluxo contínuo a partir de vários tópicos do sistema Kafka.
- Atualize uma tabela de streaming anexando dados históricos ausentes (backfilling). Por exemplo, você tem uma tabela de streaming existente que é escrita por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de streaming e não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Consulte Exemplo: Executar um preenchimento de dados único.
- Combine informações de várias fontes e grava numa tabela única de streaming em vez de usar a cláusula
UNION
numa consulta. Usar o processamento de fluxo de acréscimo em vez deUNION
permite atualizar a tabela de destino de forma incremental sem executar uma atualização completa. Consulte Exemplo: Use o fluxo de processamento por adição em vez deUNION
.
O destino para a saída de registros pelo processamento de fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.
Importante
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função
create_streaming_table()
ou em uma definição de tabela existente. Não é possível definir expectativas na definição@append_flow
. - Os fluxos são identificados por um nome de fluxo , e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
- Não é possível reutilizar um nome de fluxo em um pipeline, porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
A sintaxe do @append_flow
é a seguinte:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
Exemplo: Escrever numa tabela de streaming a partir de vários tópicos de Kafka
O exemplo a seguir cria uma tabela de streaming chamada kafka_target
e recebe dados para essa tabela de streaming de dois tópicos de Kafka.
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para saber mais sobre a função com valor de tabela read_kafka()
usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.
Em Python, você pode criar programaticamente vários fluxos que visam uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos de Kafka.
Observação
Esse padrão tem os mesmos requisitos que usar um loop de for
para criar tabelas. Você deve passar explicitamente um valor Python para a função que define o fluxo. Consulte criar tabelas num loop for
.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exemplo: Executar um preenchimento de dados único
Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de streaming:
Observação
Para garantir um verdadeiro preenchimento único quando a consulta de preenchimento fizer parte de um pipeline executado de forma agendada ou contínua, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem no diretório de preenchimento, deixe a consulta no lugar.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
Exemplo: Use o processamento de fluxo de acréscimo em vez de UNION
Em vez de usar uma consulta com uma cláusula UNION
, você pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de streaming. O uso de consultas de fluxo de acréscimo em vez de UNION
permite que você anexe a uma tabela de streaming de várias fontes sem executar um de atualização completa.
O exemplo Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Os exemplos a seguir substituem a consulta UNION
por consultas de fluxo contínuo:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);