Kusto Java SDK'sını kullanarak verileri alma
Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve yüksek oranda ölçeklenebilir veri keşfetme hizmetidir. Java istemci kitaplığı Azure Veri Gezgini kümelerindeki verileri almak, yönetim komutları vermek ve verileri sorgulamak için kullanılabilir.
Bu makalede, Azure Veri Gezgini Java kitaplığını kullanarak veri almayı öğrenin. İlk olarak, bir test kümesinde bir tablo ve veri eşlemesi oluşturacaksınız. Ardından Java SDK'sını kullanarak blob depolamadan kümeye bir alımı kuyruğa alır ve sonuçları doğrularsınız.
Önkoşullar
- Microsoft hesabı veya Microsoft Entra kullanıcı kimliği. Azure aboneliği gerekmez.
- Azure Veri Gezgini kümesi ve veritabanı. Küme ve veritabanı oluşturma.
- Git.
- JDK sürüm 1.8 veya üzeri.
- Maven.
- Bir Uygulama Kaydı oluşturun ve veritabanına izin verin. İstemci kimliğini ve gizli diziyi daha sonra kullanmak üzere kaydedin.
Kodu gözden geçirin
Bu bölüm isteğe bağlıdır. Kodun nasıl çalıştığını öğrenmek için aşağıdaki kod parçacıklarını gözden geçirin. Bu bölümü atlamak için uygulamayı çalıştırmaya gidin.
Kimlik Doğrulaması
Program, ConnectionStringBuilder' ile Microsoft Entra kimlik doğrulaması kimlik bilgilerini kullanır.
Sorgu ve yönetim için bir
com.microsoft.azure.kusto.data.Client
oluşturun.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Azure Veri Gezgini veri alımını kuyruğa almak için bir
com.microsoft.azure.kusto.ingest.IngestClient
oluşturun ve kullanın:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Yönetim komutları
ve .create
gibi .drop
yönetim komutları bir com.microsoft.azure.kusto.data.Client
nesne üzerinde çağrılarak execute
yürütülür.
Örneğin, StormEvents
tablo aşağıdaki gibi oluşturulur:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Veri alımı
Mevcut bir Azure Blob Depolama kapsayıcısından dosya kullanarak kuyruk alımı.
- Blob Depolama yolunu belirtmek için kullanın
BlobSourceInfo
. - Tablo, veritabanı, eşleme adı ve veri türünü tanımlamak için kullanın
IngestionProperties
. Aşağıdaki örnekte veri türü şeklindedirCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Alma işlemi ayrı bir iş parçacığında başlar ve main
iş parçacığı alım iş parçacığının tamamlanmasını bekler. Bu işlem CountdownLatch kullanır. Alma API'si (IngestClient#ingestFromBlob
) zaman uyumsuz değildir. Döngü while
, geçerli durumu her 5 saniyede bir yoklamak için kullanılır ve alma durumunun farklı bir duruma geçmesini Pending
bekler. Son durum , Failed
veya PartiallySucceeded
olabilirSucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
İpucu
Farklı uygulamalar için alma işlemini zaman uyumsuz olarak işlemek için başka yöntemler de vardır. Örneğin, tablonun sorgulanması veya 'a bildirilen özel durumları işleme gibi alım sonrası eylemi tanımlayan bir işlem hattı oluşturmak için IngestionStatus
kullanabilirsinizCompletableFuture
.
Uygulamayı çalıştırma
Genel
Örnek kodu çalıştırdığınızda aşağıdaki eylemler gerçekleştirilir:
- Bırakma tablosu:
StormEvents
tablo bırakılır (varsa). - Tablo oluşturma:
StormEvents
tablo oluşturulur. - Eşleme oluşturma:
StormEvents_CSV_Mapping
eşleme oluşturulur. - Dosya alımı: Bir CSV dosyası (Azure Blob Depolama) alma için kuyruğa alınır.
Aşağıdaki örnek koddan alınmalıdır App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
İpucu
farklı işlem birleşimlerini denemek için içindeki ilgili yöntemleri açıklamayı kaldırın/açıklamayı App.java
kaldırın.
Uygulamayı çalıştırma
GitHub'dan örnek kodu kopyalayın:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Hizmet sorumlusu bilgilerini aşağıdaki bilgilerle birlikte program tarafından kullanılan ortam değişkenleri olarak ayarlayın:
- Küme uç noktası
- Veritabanı adı
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Derleme ve çalıştırma:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Çıkış şuna benzer olacaktır:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Alma işleminin tamamlanması için birkaç dakika bekleyin. Başarıyla tamamlandıktan sonra şu günlük iletisini görürsünüz: Ingestion completed successfully
. Bu noktada programdan çıkıp, zaten kuyruğa alınmış olan alma işlemini etkilemeden sonraki adıma geçebilirsiniz.
Doğrulama
Kuyruğa alınan alımın alım işlemini zamanlaması ve verileri Azure Veri Gezgini'a yüklemesi için beş ile 10 dakika arasında bekleyin.
https://dataexplorer.azure.com adresinde oturum açın ve kümenize bağlanın.
Tablodaki kayıtların
StormEvents
sayısını almak için aşağıdaki komutu çalıştırın:StormEvents | count
Sorun giderme
Son dört saat içindeki alma hatalarını görmek için veritabanınızda aşağıdaki komutu çalıştırın:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Son dört saat içindeki tüm alma işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Kaynakları temizleme
Oluşturduğunuz kaynakları kullanmayı planlamıyorsanız, aşağıdaki komutu veritabanınızda çalıştırarak tabloyu bırakın StormEvents
.
.drop table StormEvents