Processamento de fluxo com Apache Kafka e Azure Databricks
Este artigo descreve como você pode usar o Apache Kafka como uma fonte ou um coletor ao executar cargas de trabalho de Streaming Estruturado no Azure Databricks.
Para mais informações sobre Kafka, consulte a documentação de Kafka.
Ler dados de Kafka
A seguir está um exemplo para uma leitura de streaming de Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
O Azure Databricks também dá suporte à semântica de leitura em lote para fontes de dados Kafka, conforme mostrado no exemplo a seguir:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Para carregamento incremental em lote, o Databricks recomenda o uso do Kafka com Trigger.AvailableNow
. Consulte A configurar o processamento em lotes incrementais.
No Databricks Runtime 13.3 LTS e superior, o Azure Databricks fornece uma função SQL para ler dados Kafka. O streaming com SQL é suportado apenas em DLT ou com tabelas de streaming em Databricks SQL. Consulte read_kafka
função com valor de tabela.
Configurar o leitor de Streaming Estruturado do Kafka
O Azure Databricks fornece a palavra-chave kafka
como um formato de dados para configurar conexões com o Kafka 0.10+.
A seguir estão as configurações mais comuns para Kafka:
Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer apenas um destes parâmetros:
Opção | valor | Descrição |
---|---|---|
subscrever | Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
padrãoDeSubscrição | Cadeia de caracteres regex Java. | O padrão usado para se inscrever no(s) tópico(s). |
atribuir | Cadeia de caracteres JSON {"topicA":[0,1],"topic":[2,4]} . |
Tópico específicoPartições a consumir. |
Outras configurações notáveis:
Opção | valor | Valor Predefinido | Descrição |
---|---|---|---|
kafka.bootstrap.servidores | Lista separada por vírgulas de host:port. | vazio | [Obrigatório] Configuração Kafka bootstrap.servers . Se você achar que não há dados de Kafka, verifique a lista de endereços do corretor primeiro. Se a lista de endereços do corretor estiver incorreta, pode não haver erros. Isso ocorre porque o cliente Kafka assume que os corretores ficarão disponíveis eventualmente e, em caso de erros de rede, continuará a tentar de novo indefinidamente. |
falhaEmPerdaDeDados |
true ou false . |
true |
[Opcional] Se a consulta deve falhar quando é possível que os dados tenham sido perdidos. As consultas podem falhar permanentemente ao ler dados de Kafka devido a muitos cenários, como tópicos excluídos, truncamento de tópico antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados foram possivelmente perdidos ou não. Às vezes, isso pode causar falsos alarmes. Defina essa opção como false se ela não funcionar conforme o esperado ou se você quiser que a consulta continue processando apesar da perda de dados. |
minPartições | Inteiro >= 0, 0 = desativado. | 0 (desativado) | [Opcional] Número mínimo de partições a ler a partir de Kafka. Você pode configurar o Spark para usar um mínimo arbitrário de partições para ler do Kafka usando a minPartitions opção. Normalmente, o Spark tem um mapeamento 1-1 das partições de tópicos de Kafka para as partições Spark que consomem do Kafka. Se você definir a opção minPartitions para um valor maior do que o seu tópico KafkaPartitions, o Spark dividirá partições Kafka grandes em partes menores. Essa opção pode ser definida em momentos de pico de cargas, distorção de dados e à medida que seu fluxo está ficando para trás para aumentar a taxa de processamento. Isso acarreta um custo de inicialização de consumidores de Kafka a cada disparo, o que pode afetar o desempenho caso use SSL para se conectar ao Kafka. |
kafka.group.id | Um ID de grupo de consumidores do Kafka. | não definido | [Opcional] ID de grupo para usar durante a leitura de Kafka. Utilize isto com precaução. Por padrão, cada consulta gera um ID de grupo exclusivo para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos inscritos. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), convém usar IDs de grupo autorizadas específicas para ler dados. Opcionalmente, você pode definir o ID do grupo. No entanto, faça isso com extrema cautela, pois pode causar um comportamento inesperado.
|
iniciandoOffsets | mais cedo , mais recente | mais recente | [Opcional] O ponto de início quando uma consulta é iniciada, ou "mais cedo", que é dos primeiros deslocamentos, ou uma string JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 pode ser usado como um offset para se referir ao mais cedo e -1 ao mais tarde. Nota: Para consultas em lote, a versão mais recente (implicitamente ou usando -1 no JSON) não é permitida. Para consultas de streaming, isto só se aplica quando uma nova consulta é iniciada, e a retomada sempre continuará de onde a consulta parou. As partições recém-descobertas durante uma consulta começarão no início. |
Consulte o Guia de Integração do Kafka de Streaming Estruturado para obter outras configurações opcionais.
Esquema para registros Kafka
O esquema dos registros de Kafka é:
Coluna | Tipo |
---|---|
chave | binário |
valor | binário |
tópico | string |
partição | int |
Compensação | longo |
carimbo de data/hora | longo |
timestampType | int |
O key
e o value
são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer
. Use operações DataFrame (como cast("string")
) para desserializar explicitamente as chaves e valores.
Gravar dados em Kafka
Segue-se um exemplo de uma gravação de streaming para Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
O Azure Databricks também dá suporte à semântica de gravação em lote para coletores de dados Kafka, conforme mostrado no exemplo a seguir:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Configurar o gravador de Kafka Structured Streaming
Importante
O Databricks Runtime 13.3 LTS e superior inclui uma versão mais recente da kafka-clients
biblioteca que permite gravações idempotentes por padrão. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE
habilitado, a gravação falhará com a mensagem org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
de erro.
Resolva esse erro atualizando para Kafka versão 2.8.0 ou superior, ou definindo .option(“kafka.enable.idempotence”, “false”)
ao configurar seu gravador de Streaming Estruturado.
O esquema fornecido ao DataStreamWriter interage com o coletor Kafka. Você pode usar os seguintes campos:
Nome da coluna | Obrigatório ou opcional | Tipo |
---|---|---|
key |
opcional |
STRING ou BINARY |
value |
obrigatório |
STRING ou BINARY |
headers |
opcional | ARRAY |
topic |
opcional (ignorado se topic estiver definido como opção de gravador) |
STRING |
partition |
opcional | INT |
A seguir estão as opções comuns definidas ao escrever para Kafka:
Opção | valor | Valor predefinido | Descrição |
---|---|---|---|
kafka.boostrap.servers |
Uma lista separada por vírgulas de <host:port> |
nenhum | [Obrigatório] A configuração de Kafka bootstrap.servers . |
topic |
STRING |
não definido | [Opcional] Define o tópico para todas as linhas a serem escritas. Esta opção substitui qualquer coluna de tópico que exista nos dados. |
includeHeaders |
BOOLEAN |
false |
[Opcional] Se devem ser incluídos os cabeçalhos de Kafka na linha. |
Consulte o Guia de Integração do Kafka de Streaming Estruturado para obter outras configurações opcionais.
Recuperar métricas de Kafka
Você pode obter a média, o mínimo e o máximo do número de offsets que a consulta de streaming está atrás do último offset disponível entre todos os tópicos subscritos, utilizando as métricas avgOffsetsBehindLatest
, maxOffsetsBehindLatest
, e minOffsetsBehindLatest
. Consulte Leitura de métricas interativamente.
Nota
Disponível no Databricks Runtime 9.1 e superior.
Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest
. Esta estimativa baseia-se nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo a opção bytesEstimateWindowLength
para um valor diferente. Por exemplo, para defini-lo como 10 minutos:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Se estiver a executar o fluxo num notebook, pode ver estas métricas na guia Dados Brutos no painel de progresso da consulta de streaming.
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Usar SSL para conectar o Azure Databricks ao Kafka
Para habilitar conexões SSL com Kafka, siga as instruções na documentação do Confluent Criptografia e autenticação com SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka.
, como opções. Por exemplo, você especifica o local de armazenamento confiável na propriedade kafka.ssl.truststore.location
.
A Databricks recomenda que você:
- Armazene seus certificados no armazenamento de objetos na nuvem. Você pode restringir o acesso aos certificados apenas aos clusters que podem acessar o Kafka. Consulte Governança de dados com o Catálogo Unity.
- Armazene suas senhas de certificado como segredos em um escopo secreto.
O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Conectar o Kafka no HDInsight ao Azure Databricks
Crie um cluster HDInsight Kafka.
Consulte Conectar-se ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para obter instruções.
Configure os corretores Kafka para anunciar o endereço correto.
Siga as instruções em Configurar o Kafka para anunciar o IP. Se você gerencia o Kafka por conta própria nas Máquinas Virtuais do Azure, certifique-se de que a configuração
advertised.listeners
dos brokers esteja definida como o IP interno dos hosts.Crie um cluster do Azure Databricks.
Emparelhe o cluster Kafka ao cluster do Azure Databricks.
Siga as instruções em Redes virtuais entre pares.
Autenticação do principal de serviço com o Microsoft Entra ID e os Hubs de Eventos do Azure
O Azure Databricks dá suporte à autenticação de trabalhos do Spark com serviços de Hubs de Eventos. Essa autenticação é feita via OAuth com o Microsoft Entra ID.
O Azure Databricks dá suporte à autenticação de ID do Microsoft Entra com uma ID de cliente e segredo nos seguintes ambientes de computação:
- Databricks Runtime 12.2 LTS e superior em computação configurada com modo de acesso dedicado (anteriormente modo de acesso de usuário único).
- Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso padrão (anteriormente modo de acesso compartilhado).
- Pipelines de DLT configurados sem o Unity Catalog.
O Azure Databricks não oferece suporte à autenticação de ID do Microsoft Entra com um certificado em qualquer ambiente de computação ou em pipelines DLT configurados com o Unity Catalog.
Essa autenticação não funciona na computação com modo de acesso padrão ou no Unity Catalog DLT.
Configurando o conector Kafka de streaming estruturado
Para executar a autenticação com o Microsoft Entra ID, você precisará dos seguintes valores:
Um ID de locatário. Você pode encontrar isso na guia Serviços do Microsoft Entra ID .
Um clientID (também conhecido como ID do aplicativo).
Um segredo do cliente. Depois de ter isso, você deve adicioná-lo como um segredo ao seu espaço de trabalho Databricks. Para adicionar esse segredo, consulte Gerenciamento secreto.
Um tópico do EventHubs. Pode encontrar uma lista de tópicos na secção Hubs de Eventos na secção Entidades numa página específica de Namespace de Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função do IAM no nível dos Hubs de Eventos.
Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico de Hubs de Eventos:
Além disso, para usar o Entra ID, precisamos dizer a Kafka para usar o mecanismo SASL OAuth (SASL é um protocolo genérico, e OAuth é um tipo de "mecanismo" SASL):
-
kafka.security.protocol
deve serSASL_SSL
-
kafka.sasl.mechanism
deve serOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
deve ser um nome completamente qualificado da classe Java ekafkashaded
deve ser atribuído como valor ao manipulador de retorno de chamada de login da nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.
Exemplo
Em seguida, vejamos um exemplo em execução:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Tratamento de possíveis erros
As opções de streaming não são suportadas.
Se você tentar usar esse mecanismo de autenticação em um pipeline DLT configurado com o Unity Catalog, poderá receber o seguinte erro:
Para resolver esse erro, use uma configuração de computação suportada. Consulte Autenticação da entidade de serviço com Microsoft Entra ID e Azure Event Hubs.
Falha ao criar um novo
KafkaAdminClient
.Este é um erro interno que Kafka lança se qualquer uma das seguintes opções de autenticação estiver incorreta:
- ID do cliente (também conhecido como ID do aplicativo)
- Identificador de Inquilino
- Servidor EventHubs
Para resolver o erro, verifique se os valores estão corretos para essas opções.
Além disso, poderá ver este erro se modificar as opções de configuração fornecidas por predefinição no exemplo (que lhe foi pedido para não modificar), como
kafka.security.protocol
.Não há registros sendo devolvidos
Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.
Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Algumas razões possíveis (embora não exaustivas) são:
- Você especificou o tópico EventHubs errado.
- A opção de configuração padrão do Kafka para
startingOffsets
élatest
, e você ainda não está recebendo nenhum dado através do tópico. Você pode definirstartingOffsetstoearliest
para começar a ler dados a partir dos primeiros offsets de Kafka.