Bagikan melalui


Menyerap data dari Apache Kafka ke Azure Data Explorer

Apache Kafka adalah platform streaming terdistribusi untuk membangun alur data streaming real-time yang dengan andal memindahkan data antara sistem atau aplikasi. Kafka Connect adalah alat untuk streaming data yang dapat diskalakan dan dapat diandalkan antara Apache Kafka dan sistem data lainnya. Sink Kusto Kafka berfungsi sebagai konektor dari Kafka dan tidak memerlukan penggunaan kode. Unduh jar konektor sink dari repositori Git atau Confluent Connector Hub.

Artikel ini menunjukkan cara menyerap data dengan Kafka, menggunakan penyiapan Docker mandiri untuk menyederhanakan penyiapan kluster Kafka dan kluster konektor Kafka.

Untuk informasi selengkapnya, lihat repositori Git konektor dan spesifikasi versi.

Prasyarat

Membuat perwakilan layanan Microsoft Entra

Perwakilan layanan Microsoft Entra dapat dibuat melalui portal Azure atau secara terprogram, seperti dalam contoh berikut.

Perwakilan layanan ini adalah identitas yang digunakan oleh konektor untuk menulis data tabel Anda di Kusto. Anda memberikan izin bagi perwakilan layanan ini untuk mengakses sumber daya Kusto.

  1. Masuk ke langganan Azure Anda melalui Azure CLI. Kemudian autentikasi di browser.

    az login
    
  2. Pilih langganan untuk menghosting perwakilan. Langkah ini diperlukan saat Anda memiliki beberapa langganan.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Buat perwakilan layanan. Dalam contoh ini, perwakilan layanan disebut my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dari data JSON yang dikembalikan, salin appId, password, dan tenant untuk penggunaan di masa mendatang.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Anda telah membuat aplikasi Microsoft Entra dan perwakilan layanan Anda.

Membuat tabel target

  1. Dari lingkungan kueri Anda, buat tabel yang disebut Storms menggunakan perintah berikut:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Buat pemetaan Storms_CSV_Mapping tabel terkait untuk data yang diserap menggunakan perintah berikut:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Buat kebijakan batching penyerapan pada tabel untuk latensi penyerapan antrean yang dapat dikonfigurasi.

    Tip

    Kebijakan batching penyerapan adalah pengoptimal performa dan mencakup tiga parameter. Kondisi pertama yang dipenuhi memicu penyerapan ke dalam tabel.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Gunakan perwakilan layanan dari Buat perwakilan layanan Microsoft Entra untuk memberikan izin untuk bekerja dengan database.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Jalankan lab

Lab berikut dirancang untuk memberi Anda pengalaman mulai membuat data, menyiapkan konektor Kafka, dan mengalirkan data ini. Anda kemudian dapat melihat data yang diserap.

Mengkloning repositori git

Kloning repositori git lab.

  1. Buat direktori lokal di komputer Anda.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Kloning repositori.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Isi repositori kloning

Jalankan perintah berikut untuk mencantumkan konten repositori kloning:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Hasil dari pencarian ini adalah:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Meninjau file dalam repositori kloning

Bagian berikut menjelaskan bagian penting dari file di pohon file.

adx-sink-config.json

File ini berisi file properti sink Kusto tempat Anda memperbarui detail konfigurasi tertentu:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Ganti nilai untuk atribut berikut sesuai penyiapan Anda: aad.auth.authority, , aad.auth.appid, kusto.tables.topics.mapping aad.auth.appkey(nama database), kusto.ingestion.url, dan kusto.query.url.

Konektor - Dockerfile

File ini memiliki perintah untuk menghasilkan gambar docker untuk instans konektor. Ini termasuk unduhan konektor dari direktori rilis repositori git.

Direktori storm-events-producer

Direktori ini memiliki program Go yang membaca file "StormEvents.csv" lokal dan menerbitkan data ke topik Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Memulai kontainer

  1. Di terminal, mulai kontainer:

    docker-compose up
    

    Aplikasi produser mulai mengirim peristiwa ke topik tersebut storm-events . Anda akan melihat log yang mirip dengan log berikut:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Untuk memeriksa log, jalankan perintah berikut di terminal terpisah:

    docker-compose logs -f | grep kusto-connect
    

Memulai konektor

Gunakan panggilan REST Kafka Connect untuk memulai konektor.

  1. Di terminal terpisah, luncurkan tugas sink dengan perintah berikut:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Untuk memeriksa status, jalankan perintah berikut di terminal terpisah:

    curl http://localhost:8083/connectors/storm/status
    

Konektor mulai mengantre proses penyerapan.

Catatan

Jika Anda mengalami masalah konektor log, buat masalah.

Identitas terkelola

Secara default, konektor Kafka menggunakan metode aplikasi untuk autentikasi selama penyerapan. Untuk mengautentikasi menggunakan identitas terkelola:

  1. Tetapkan identitas terkelola kluster Anda dan berikan izin baca akun penyimpanan Anda. Untuk informasi selengkapnya, lihat Menyerap data menggunakan autentikasi identitas terkelola.

  2. Dalam file adx-sink-config.json Anda, atur aad.auth.strategy ke managed_identity dan pastikan diatur aad.auth.appid ke ID klien identitas terkelola (aplikasi).

  3. Gunakan token layanan metadata instans privat alih-alih perwakilan layanan Microsoft Entra.

Catatan

Saat menggunakan identitas terkelola, appId dan tenant disimpulkan dari konteks situs panggilan dan password tidak diperlukan.

Mengkueri dan meninjau data

Mengonfirmasi penyerapan data

  1. Setelah data tiba dalam Storms tabel, konfirmasikan transfer data, dengan memeriksa jumlah baris:

    Storms 
    | count
    
  2. Konfirmasikan bahwa tidak ada kegagalan dalam proses penyerapan:

    .show ingestion failures
    

    Setelah Anda melihat data, cobalah beberapa kueri.

Mengueri data

  1. Untuk melihat semua rekaman, jalankan kueri berikut:

    Storms
    | take 10
    
  2. Gunakan where dan project untuk memfilter data tertentu:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. summarize Gunakan operator:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Cuplikan layar hasil bagan kolom kueri Kafka yang tersambung.

Untuk contoh dan panduan kueri lainnya, lihat Menulis kueri di dokumentasi KQL dan Bahasa Kueri Kusto.

Reset

Untuk mengatur ulang, lakukan langkah-langkah berikut:

  1. Hentikan kontainer (docker-compose down -v)
  2. Hapus (drop table Storms)
  3. Membuat Storms ulang tabel
  4. Membuat ulang pemetaan tabel
  5. Mulai ulang kontainer (docker-compose up)

Membersihkan sumber daya

Untuk menghapus sumber daya Azure Data Explorer, gunakan az kusto cluster delete (ekstensi kusto) atau az kusto database delete (ekstensi kusto):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Anda juga dapat menghapus kluster dan database Anda melalui portal Azure. Untuk informasi selengkapnya, lihat Menghapus kluster Azure Data Explorer dan Menghapus database di Azure Data Explorer.

Menyetel konektor Kafka Sink

Sesuaikan konektor Kafka Sink untuk bekerja dengan kebijakan batching penyerapan:

  • Sesuaikan batas ukuran Kafka Sink flush.size.bytes mulai dari 1 MB, meningkat dengan kenaikan 10 MB atau 100 MB.
  • Saat menggunakan Kafka Sink, data dikumpulkan dua kali. Pada data sisi konektor dikumpulkan sesuai dengan pengaturan flush, dan di sisi layanan sesuai dengan kebijakan batching. Jika waktu batching terlalu singkat sehingga data tidak dapat diserap oleh konektor dan layanan, waktu batching harus ditingkatkan. Atur ukuran batching pada 1 GB dan tingkatkan atau kurangi dengan kenaikan 100 MB sesuai kebutuhan. Misalnya, jika ukuran flush adalah 1 MB dan ukuran kebijakan batching adalah 100 MB, konektor Kafka Sink menggabungkan data ke dalam batch 100-MB. Batch itu kemudian diserap oleh layanan. Jika waktu kebijakan batching adalah 20 detik dan konektor Kafka Sink menyiram 50 MB dalam periode 20 detik, maka layanan menyerap batch 50 MB.
  • Anda dapat menskalakan dengan menambahkan instans dan partisi Kafka. Tingkatkan tasks.max ke jumlah partisi. Buat partisi jika Anda memiliki cukup data untuk menghasilkan blob dengan ukuran flush.size.bytes pengaturan. Jika blob lebih kecil, batch diproses ketika mencapai batas waktu, sehingga partisi tidak menerima throughput yang cukup. Sejumlah besar partisi berarti lebih banyak pemrosesan overhead.