Apache Flink ile Azure Veri Gezgini'a veri alma
Apache Flink , ilişkisiz ve sınırlanmış veri akışları üzerinden durum bilgisi olan hesaplamalar için bir çerçeve ve dağıtılmış işleme altyapısıdır.
Flink bağlayıcısı, herhangi bir Flink kümesinde çalışabilen bir açık kaynak projesidir. Bir Flink kümesinden veri taşımak için veri havuzu uygular. Bağlayıcıyı Apache Flink'e kullanarak makine öğrenmesi (ML), Ayıklama-Dönüştürme-Yükleme (ETL) ve Log Analytics gibi veri temelli senaryoları hedefleyen hızlı ve ölçeklenebilir uygulamalar oluşturabilirsiniz.
Bu makalede, Flink bağlayıcısını kullanarak Flink'ten tablonuza veri göndermeyi öğreneceksiniz. Bir tablo ve veri eşlemesi oluşturur, Flink'i tabloya veri göndermeye yönlendirir ve ardından sonuçları doğrularsınız.
Önkoşullar
- Azure Veri Gezgini kümesi ve veritabanı. Microsoft Fabric'te Gerçek Zamanlı Zeka'da bir küme ve veritabanı veya KQL veritabanı oluşturun.
- Veritabanınızdaki bir hedef tablo. Bkz. Azure'da tablo oluşturma Veri Gezgini veya Gerçek Zamanlı Zeka'da tablo oluşturma
- Apache Flink kümesi. Küme oluşturma.
- Maven 3.x
Flink bağlayıcısını alma
Bağımlılıkları yönetmek için Maven kullanan Flink projeleri için, bağımlılık olarak ekleyerek Azure Veri Gezgini için Flink Bağlayıcısı Çekirdek Havuzu'nı tümleştirin:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
Bağımlılıkları yönetmek için Maven kullanmayan projeler için Apache Flink için Azure Veri Gezgini Bağlayıcısı deposunu kopyalayın ve yerel olarak derleyin. Bu yaklaşım, komutunu mvn clean install -DskipTests
kullanarak bağlayıcıyı yerel Maven deponuza el ile eklemenize olanak tanır.
Bir Microsoft Entra ID uygulaması veya yönetilen kimlik kullanarak Flink'ten kimlik doğrulaması yapabilirsiniz.
Bu hizmet sorumlusu, kusto'da tablonuza veri yazmak için bağlayıcı tarafından kullanılan kimlik olacaktır. Daha sonra bu hizmet sorumlusu için Kusto kaynaklarına erişim izinleri verirsiniz.
Azure CLI aracılığıyla Azure aboneliğinizde oturum açın. Ardından tarayıcıda kimlik doğrulaması yapın.
az login
Sorumluyu barındırmak için aboneliği seçin. Bu adım, birden çok aboneliğiniz olduğunda gereklidir.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Hizmet sorumlusunu oluşturun. Bu örnekte hizmet sorumlusu olarak adlandırılır
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Döndürülen JSON verilerinden, gelecekte kullanmak üzere ,
password
vetenant
değerini kopyalayınappId
.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Microsoft Entra uygulamanızı ve hizmet sorumlunuzu oluşturdunuz.
Uygulamaya veritabanında kullanıcı izinleri verin:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Uygulamaya tabloda alma veya yönetici izinleri verin. Gerekli izinler, seçilen veri yazma yöntemine bağlıdır. Alma izinleri SinkV2 için yeterliyken WriteAndSink yönetici izinleri gerektirir.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Yetkilendirme hakkında daha fazla bilgi için bkz . Kusto rol tabanlı erişim denetimi.
Flink'ten veri yazma
Flink'ten veri yazmak için:
Gerekli seçenekleri içeri aktarın:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Kimlik doğrulaması yapmak için uygulamanızı veya yönetilen kimliğinizi kullanın.
Uygulama kimlik doğrulaması için:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Yönetilen kimlik kimlik doğrulaması için:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Veritabanı ve tablo gibi havuz parametrelerini yapılandırın:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Aşağıdaki tabloda açıklandığı gibi daha fazla seçenek ekleyebilirsiniz:
Seçenek Açıklama Varsayılan Değer IngestionMappingRef Var olan bir alım eşlemeye başvurur. FlushImmediately Verileri hemen temizler ve performans sorunlarına neden olabilir. Bu yöntem önerilmez. BatchIntervalMs Verilerin ne sıklıkta temizlendiğini denetler. 30 saniye BatchSize Temizlemeden önce kayıtları arabelleğe alma için toplu iş boyutunu ayarlar. 1.000 kayıt ClientBatchSizeLimit Veri alımından önce toplanan verilerin MB cinsinden boyutunu belirtir. 300 MB PollForIngestionStatus Doğruysa, bağlayıcı veri temizlemeden sonra alma durumunu yoklar. yanlış DeliveryGuarantee Teslim garantisi semantiğini belirler. Tam olarak bir kez semantik elde etmek için WriteAheadSink kullanın. AT_LEAST_ONCE Akış verilerini aşağıdaki yöntemlerden biriyle yazın:
- SinkV2: Bu, denetim noktasındaki verileri temizleyerek en az bir kez tutarlılık sağlayan durum bilgisi olmayan bir seçenektir. Yüksek hacimli veri alımı için bu seçeneği öneririz.
- WriteAheadSink: Bu yöntem verileri KustoSink'e yayar. Flink'in kontrol noktası sistemiyle tümleştirilmiştir ve tam olarak bir kez garanti sunar. Veriler bir AbstractStateBackend içinde depolanır ve yalnızca bir denetim noktası tamamlandıktan sonra işlenir.
Aşağıdaki örnekte SinkV2 kullanılır. WriteAheadSink kullanmak için yerine yöntemini
build
kullanınbuildWriteAheadSink
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Kodun tamamı şuna benzer olmalıdır:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Verilerin alındığını doğrulama
Bağlantı yapılandırıldıktan sonra veriler tablonuza gönderilir. Bir KQL sorgusu çalıştırarak verilerin alındığını doğrulayabilirsiniz.
Verilerin tabloya alındığını doğrulamak için aşağıdaki sorguyu çalıştırın:
<MyTable> | count
Verileri görüntülemek için aşağıdaki sorguyu çalıştırın:
<MyTable> | take 100
İlgili içerik
- Sorgu yazma
- Azure Veri Gezgini ile AKS üzerinde Azure HDInsight üzerinde Apache Flink kullanma