Carregar dados usando tabelas de streaming no Databricks SQL
O Databricks recomenda o uso de tabelas de streaming para ingerir dados usando o Databricks SQL. Uma tabela de streaming é uma tabela registrada no Unity Catalog com suporte extra para streaming ou processamento incremental de dados. Um pipeline DLT é criado automaticamente para cada tabela de streaming. Você pode usar tabelas de streaming para carregamento incremental de dados do Kafka e armazenamento de objetos na nuvem.
Este artigo demonstra o uso de tabelas de streaming para carregar dados do armazenamento de objetos na nuvem configurado como um volume do Catálogo Unity (recomendado) ou local externo.
Nota
Para aprender a utilizar tabelas Delta Lake como fontes e destinos de streaming, consulte Leituras e gravações de streaming em tabelas Delta.
Importante
As tabelas de streaming criadas no Databricks SQL são apoiadas por um pipeline DLT sem servidor. Seu espaço de trabalho deve oferecer suporte a pipelines sem servidor para usar essa funcionalidade.
Antes de começar
Antes de começar, você deve atender aos seguintes requisitos.
Requisitos do espaço de trabalho:
- Uma conta do Azure Databricks com funcionalidade serverless ativada. Para obter mais informações, consulte Habilitar SQL warehouses sem servidor.
- Um espaço de trabalho com o Unity Catalog ativado. Para obter mais informações, consulte Configurar e gerenciar o catálogo Unity.
Requisitos de computação:
Você deve usar uma das seguintes opções:
- Um armazém SQL que utiliza o
Current
canal. - Calcule com o modo de acesso padrão (anteriormente modo de acesso compartilhado) no Databricks Runtime 13.3 LTS ou superior.
Utilize o modo de acesso dedicado (anteriormente modo de acesso de usuário único) no Databricks Runtime 15.4 LTS ou superior.
No Databricks Runtime 15.3 e inferior, não é possível usar computação dedicada para consultar tabelas de streaming que pertencem a outros usuários. Você pode usar computação dedicada no Databricks Runtime 15.3 e abaixo somente se possuir a tabela de streaming. O criador da mesa é o proprietário.
O Databricks Runtime 15.4 LTS e superior suporta consultas em tabelas geradas por DLT em computação dedicada, independentemente da propriedade da tabela. Para aproveitar a filtragem de dados fornecida no Databricks Runtime 15.4 LTS e superior, você deve confirmar se seu espaço de trabalho está habilitado para computação sem servidor porque a funcionalidade de filtragem de dados que suporta tabelas geradas por DLT é executada em computação sem servidor. Você pode ser cobrado por recursos de computação sem servidor quando usa computação dedicada para executar operações de filtragem de dados. Consulte Controlo de acesso detalhado em computação dedicada (anteriormente computação para utilizador único).
Requisitos de permissões:
- O
READ FILES
privilégio em uma localização externa do Catálogo Unity. Para obter informações, consulte Criar um local externo para conectar o armazenamento em nuvem ao Azure Databricks. - O
USE CATALOG
privilégio no catálogo no qual se cria a tabela de streaming. - O
USE SCHEMA
privilégio no esquema no qual se cria a tabela de streaming. - O
CREATE TABLE
privilégio no esquema no qual o utilizador cria a tabela de streaming.
Outros requisitos:
O caminho para os dados de origem.
Exemplo de caminho de volume:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Exemplo de caminho de local externo:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Nota
Este artigo pressupõe que os dados que você deseja carregar estejam em um local de armazenamento em nuvem que corresponda a um volume do Catálogo Unity ou local externo ao qual você tenha acesso.
Descobrir e visualizar dados de origem
Na barra lateral da área de trabalho, clique em Consultas e, em seguida, clique em Criar consulta.
No editor de consultas, selecione um SQL warehouse que utilize o canal
Current
na lista suspensa.Cole o seguinte no editor, substituindo valores entre colchetes angulares (
<>
) pelas informações que identificam seus dados de origem e clique em Executar.Nota
Se as pré-definições da função não conseguirem analisar os seus dados, pode encontrar erros de inferência de esquema ao executar a função com valor de tabela
read_files
. Por exemplo, talvez seja necessário configurar o modo de várias linhas para arquivos CSV ou JSON de várias linhas. Para obter uma lista de opções do analisador de sintaxe, consulte a função com valor de tabelaread_files
./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Carregar dados em uma tabela de streaming
Para criar uma tabela de streaming a partir de dados no armazenamento de objetos na nuvem, cole o seguinte no editor de consultas e clique em Executar:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Definir o canal de execução
As tabelas de streaming criadas usando armazéns SQL são atualizadas automaticamente usando um pipeline DLT. Os pipelines DLT usam o tempo de execução no canal current
por padrão. Consulte as notas de versão do DLT e o processo de atualização da versão para aprender sobre o processo de lançamento.
A Databricks recomenda o uso do canal current
para cargas de trabalho de produção. Novos recursos são lançados primeiro no preview
canal. Você pode definir um pipeline para o canal DLT de visualização para testar novos recursos especificando preview
como uma propriedade de tabela. Você pode especificar essa propriedade ao criar a tabela ou depois que a tabela for criada usando uma instrução ALTER.
O exemplo de código a seguir mostra como definir o canal para visualização em uma instrução CREATE:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
Atualizar uma tabela de streaming usando um pipeline de DLT
Esta seção descreve padrões para atualizar uma tabela de streaming com os dados mais recentes disponíveis das fontes definidas na consulta.
Quando você CREATE
ou REFRESH
uma tabela de streaming, a atualização é processada usando um pipeline DLT sem servidor. Cada tabela de streaming definida tem um pipeline DLT associado.
Depois de executar o comando REFRESH
, o link de pipeline DLT é retornado. Você pode usar o link de pipeline DLT para verificar o status da atualização.
Nota
Somente o proprietário da tabela pode atualizar uma tabela de streaming para obter os dados mais recentes. O usuário que cria a tabela é o proprietário, e o proprietário não pode ser alterado. Talvez seja necessário atualizar a sua tabela de streaming antes de usar consultas de viagens no tempo.
Consulte O que é DLT?.
Ingerir apenas novos dados
Por padrão, a função read_files
lê todos os dados existentes no diretório de origem durante a criação da tabela e, em seguida, processa os registros recém-chegados a cada atualização.
Para evitar a ingestão de dados que já existem no diretório de origem no momento da criação da tabela, defina a opção includeExistingFiles
como false
. Isso significa que apenas os dados que chegam ao diretório após a criação da tabela são processados. Por exemplo:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
Atualizar totalmente uma tabela de streaming
As atualizações completas reprocessam todos os dados disponíveis na fonte com a definição mais recente. Não é recomendável chamar atualizações completas em fontes que não mantêm todo o histórico dos dados ou têm períodos de retenção curtos, como Kafka, porque a atualização completa trunca os dados existentes. Talvez não seja possível recuperar dados antigos se os dados não estiverem mais disponíveis na fonte.
Por exemplo:
REFRESH STREAMING TABLE my_bronze_table FULL
Agendar uma tabela de streaming para atualização automática
Para configurar uma tabela de streaming para atualizar automaticamente com base em uma agenda definida, cole o seguinte no editor de consultas e clique em Executar:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Por exemplo, para atualizar as consultas de programação, consulte ALTER STREAMING TABLE.
Acompanhar o status de uma atualização
Você pode verificar o status de uma atualização de tabela de streaming consultando o pipeline que gerencia a tabela de streaming na UI do DLT ou consultando as Informações de Atualização retornadas pelo comando DESCRIBE EXTENDED
para a tabela de streaming.
DESCRIBE EXTENDED <table-name>
Ingestão de dados em streaming do Kafka
Para obter um exemplo de ingestão de streaming de Kafka, consulte read_kafka.
Conceder aos usuários acesso a uma tabela de streaming
Para conceder aos usuários o SELECT
privilégio na tabela de streaming para que eles possam consultá-la, cole o seguinte no editor de consultas e clique em Executar:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Para obter mais informações sobre como conceder privilégios em objetos protegíveis do Unity Catalog, consulte Privilégios e objetos protegíveis do Unity Catalog.
Excluir permanentemente registros de uma tabela de streaming
Importante
O suporte para a instrução REORG
com tabelas de streaming está em Public Preview.
Nota
- Usar uma instrução
REORG
com uma tabela de streaming requer o Databricks Runtime 15.4 e superior. - Embora você possa usar a instrução
REORG
com qualquer tabela de streaming, ela só é necessária ao excluir registros de uma tabela de streaming com vetores de exclusão habilitados. O comando não tem efeito quando usado com uma tabela de streaming sem vetores de exclusão habilitados.
Para excluir fisicamente registros do armazenamento subjacente de uma tabela de streaming com vetores de exclusão habilitados, como para conformidade com o GDPR, etapas adicionais devem ser tomadas para garantir que uma operação de VACUUM seja executada nos dados da tabela de streaming.
A seguir descrevemos essas etapas com mais detalhes:
- Atualize registros ou exclua registros da tabela de streaming.
- Execute uma instrução
REORG
na tabela de streaming, especificando o parâmetroAPPLY (PURGE)
. Por exemplo,REORG TABLE <streaming-table-name> APPLY (PURGE);
. - Aguarde até que o período de retenção de dados da tabela de streaming passe. O período de retenção de dados padrão é de sete dias, mas pode ser configurado com a propriedade
delta.deletedFileRetentionDuration
table. Consulte Configuração de retenção de dados para consultas de viagem no tempo. -
REFRESH
a tabela de streaming. Consulte Atualizar uma tabela de streaming usando um pipeline de DLT. No prazo de 24 horas após a operaçãoREFRESH
, as tarefas de manutenção DLT, incluindo a operaçãoVACUUM
necessária para garantir que os registos são permanentemente eliminados, são executadas automaticamente. Consulte Tarefas de manutenção executadas pela DLT.
O monitor é executado usando o histórico de consultas
Você pode usar a página de histórico de consultas para aceder a detalhes de interrogações e perfis de consulta, o que pode ajudá-lo a identificar consultas de baixo desempenho e gargalos no pipeline DLT usado para executar as suas atualizações de tabelas streaming. Para obter uma visão geral do tipo de informações disponíveis em históricos de consultas e perfis de consulta, consulte Histórico de consultas e Perfil de consulta.
Importante
Esta funcionalidade está em Pré-visualização Pública. Os administradores do espaço de trabalho podem ativar esse recurso na página Visualizações . Consulte Gerenciar visualizações do Azure Databricks.
Todas as instruções relacionadas a tabelas de streaming aparecem no histórico de consultas. Pode usar o filtro suspenso de Instrução para selecionar qualquer comando e inspecionar as consultas relacionadas. Todas as instruções CREATE
são seguidas por uma instrução REFRESH
que é executada de forma assíncrona em um pipeline DLT. As REFRESH
instruções geralmente incluem planos de consulta detalhados que fornecem informações sobre a otimização do desempenho.
Para aceder a declarações REFRESH
na interface do utilizador do histórico de consultas, siga os seguintes passos:
- Clique no
na barra lateral esquerda para abrir a interface de utilizador do histórico de consultas.
- Selecione a caixa de seleção REFRESH no filtro de lista suspensa Statement.
- Clique no nome da instrução de consulta para exibir detalhes de resumo, como a duração da consulta e métricas agregadas.
- Clique em Ver perfil de consulta para abrir o perfil de consulta. Consulte Perfil de consulta para obter detalhes sobre como navegar no perfil de consulta.
- Opcionalmente, pode usar os links na secção Origem da Consulta para abrir a consulta ou o pipeline relacionado.
Você também pode acessar os detalhes da consulta usando links no editor SQL ou a partir de um bloco de anotações anexado a um SQL warehouse.