Menyerap data dengan Apache Flink ke Azure Data Explorer
Apache Flink adalah kerangka kerja dan mesin pemrosesan terdistribusi untuk komputasi stateful melalui aliran data yang tidak terbatas dan terikat.
Konektor Flink adalah proyek sumber terbuka yang dapat berjalan pada kluster Flink apa pun. Ini mengimplementasikan sink data untuk memindahkan data dari kluster Flink. Dengan menggunakan konektor ke Apache Flink, Anda dapat membangun aplikasi yang cepat dan dapat diskalakan yang menargetkan skenario berbasis data, misalnya, pembelajaran mesin (ML), Extract-Transform-Load (ETL), dan Analitik Log.
Dalam artikel ini, Anda mempelajari cara menggunakan konektor Flink untuk mengirim data dari Flink ke tabel Anda. Anda membuat tabel dan pemetaan data, mengarahkan Flink untuk mengirim data ke dalam tabel, lalu memvalidasi hasilnya.
Prasyarat
- Kluster dan database Azure Data Explorer. Buat kluster dan database atau database KQL di Real-Time Intelligence di Microsoft Fabric.
- Tabel target di database Anda. Lihat Membuat tabel di Azure Data Explorer atau Membuat tabel di Kecerdasan Real Time
- Kluster Apache Flink. Membuat cluster.
- Maven 3.x
Mendapatkan konektor Flink
Untuk proyek Flink yang menggunakan Maven untuk mengelola dependensi, integrasikan Flink Connector Core Sink For Azure Data Explorer dengan menambahkannya sebagai dependensi:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
Untuk proyek yang tidak menggunakan Maven untuk mengelola dependensi, kloning repositori untuk Konektor Azure Data Explorer untuk Apache Flink dan buat secara lokal. Pendekatan ini memungkinkan Anda untuk menambahkan konektor secara manual ke repositori Maven lokal Anda menggunakan perintah mvn clean install -DskipTests
.
Anda dapat mengautentikasi dari Flink untuk menggunakan aplikasi ID Microsoft Entra atau identitas terkelola.
Perwakilan layanan ini akan menjadi identitas yang digunakan oleh konektor untuk menulis data tabel Anda di Kusto. Anda nantinya akan memberikan izin bagi perwakilan layanan ini untuk mengakses sumber daya Kusto.
Masuk ke langganan Azure Anda melalui Azure CLI. Kemudian autentikasi di browser.
az login
Pilih langganan untuk menghosting perwakilan. Langkah ini diperlukan saat Anda memiliki beberapa langganan.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Buat perwakilan layanan. Dalam contoh ini, perwakilan layanan disebut
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Dari data JSON yang dikembalikan, salin
appId
,password
, dantenant
untuk penggunaan di masa mendatang.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Anda telah membuat aplikasi Microsoft Entra dan perwakilan layanan Anda.
Berikan izin pengguna aplikasi pada database:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Berikan aplikasi izin ingestor atau admin pada tabel. Izin yang diperlukan bergantung pada metode penulisan data yang dipilih. Izin ingestor cukup untuk SinkV2, sementara WriteAndSink memerlukan izin admin.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Untuk informasi selengkapnya tentang otorisasi, lihat Kontrol akses berbasis peran Kusto.
Menulis data dari Flink
Untuk menulis data dari Flink:
Impor opsi yang diperlukan:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Gunakan aplikasi atau identitas terkelola Anda untuk Mengautentikasi.
Untuk autentikasi aplikasi:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Untuk autentikasi identitas terkelola:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Konfigurasikan parameter sink seperti database dan tabel:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Anda bisa menambahkan opsi lainnya, seperti yang dijelaskan dalam tabel berikut ini:
Opsi Deskripsi Nilai Default IngestionMappingRef Mereferensikan pemetaan penyerapan yang ada. FlushImmediately Menghapus data dengan segera, dan dapat menyebabkan masalah performa. Metode ini tidak disarankan. BatchIntervalMs Mengontrol seberapa sering data dibersihkan. 30 detik BatchSize Mengatur ukuran batch untuk rekaman buffering sebelum pembilasan. 1.000 rekaman ClientBatchSizeLimit Menentukan ukuran dalam MB data agregat sebelum penyerapan. 300 MB PollForIngestionStatus Jika true, konektor melakukan polling untuk status penyerapan setelah data dihapus. salah DeliveryGuarantee Menentukan semantik jaminan pengiriman. Untuk mencapai semantik tepat sekali, gunakan WriteAheadSink. AT_LEAST_ONCE Tulis data streaming dengan salah satu metode berikut:
- SinkV2: Ini adalah opsi stateless yang menghapus data pada titik pemeriksaan, memastikan setidaknya sekali konsistensi. Kami merekomendasikan opsi ini untuk penyerapan data volume tinggi.
- WriteAheadSink: Metode ini memancarkan data ke KustoSink. Ini terintegrasi dengan sistem titik pemeriksaan Flink dan menawarkan jaminan sekali persis. Data disimpan dalam AbstractStateBackend dan dilakukan hanya setelah titik pemeriksaan selesai.
Contoh berikut menggunakan SinkV2. Untuk menggunakan WriteAheadSink, gunakan
buildWriteAheadSink
metode alih-alihbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Kode lengkap akan terlihat seperti ini:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Verifikasi bahwa data diserap
Setelah koneksi dikonfigurasi, data dikirim ke tabel Anda. Anda dapat memverifikasi bahwa data diserap dengan menjalankan kueri KQL.
Jalankan kueri berikut untuk memverifikasi bahwa data diserap ke dalam tabel:
<MyTable> | count
Jalankan kueri berikut untuk menampilkan data:
<MyTable> | take 100