Aracılığıyla paylaş


Kusto Java SDK'sını kullanarak verileri alma

Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve yüksek oranda ölçeklenebilir veri keşfetme hizmetidir. Java istemci kitaplığı Azure Veri Gezgini kümelerindeki verileri almak, yönetim komutları vermek ve verileri sorgulamak için kullanılabilir.

Bu makalede, Azure Veri Gezgini Java kitaplığını kullanarak veri almayı öğrenin. İlk olarak, bir test kümesinde bir tablo ve veri eşlemesi oluşturacaksınız. Ardından Java SDK'sını kullanarak blob depolamadan kümeye bir alımı kuyruğa alır ve sonuçları doğrularsınız.

Önkoşullar

Kodu gözden geçirin

Bu bölüm isteğe bağlıdır. Kodun nasıl çalıştığını öğrenmek için aşağıdaki kod parçacıklarını gözden geçirin. Bu bölümü atlamak için uygulamayı çalıştırmaya gidin.

Kimlik Doğrulaması

Program, ConnectionStringBuilder' ile Microsoft Entra kimlik doğrulaması kimlik bilgilerini kullanır.

  1. Sorgu ve yönetim için bir com.microsoft.azure.kusto.data.Client oluşturun.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Azure Veri Gezgini veri alımını kuyruğa almak için bir com.microsoft.azure.kusto.ingest.IngestClient oluşturun ve kullanın:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Yönetim komutları

ve .creategibi .drop yönetim komutları bir com.microsoft.azure.kusto.data.Client nesne üzerinde çağrılarak execute yürütülür.

Örneğin, StormEvents tablo aşağıdaki gibi oluşturulur:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Veri alımı

Mevcut bir Azure Blob Depolama kapsayıcısından dosya kullanarak kuyruk alımı.

  • Blob Depolama yolunu belirtmek için kullanın BlobSourceInfo .
  • Tablo, veritabanı, eşleme adı ve veri türünü tanımlamak için kullanın IngestionProperties . Aşağıdaki örnekte veri türü şeklindedir CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Alma işlemi ayrı bir iş parçacığında başlar ve main iş parçacığı alım iş parçacığının tamamlanmasını bekler. Bu işlem CountdownLatch kullanır. Alma API'si (IngestClient#ingestFromBlob) zaman uyumsuz değildir. Döngü while , geçerli durumu her 5 saniyede bir yoklamak için kullanılır ve alma durumunun farklı bir duruma geçmesini Pending bekler. Son durum , Failedveya PartiallySucceededolabilirSucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

İpucu

Farklı uygulamalar için alma işlemini zaman uyumsuz olarak işlemek için başka yöntemler de vardır. Örneğin, tablonun sorgulanması veya 'a bildirilen özel durumları işleme gibi alım sonrası eylemi tanımlayan bir işlem hattı oluşturmak için IngestionStatuskullanabilirsinizCompletableFuture.

Uygulamayı çalıştırma

Genel

Örnek kodu çalıştırdığınızda aşağıdaki eylemler gerçekleştirilir:

  1. Bırakma tablosu: StormEvents tablo bırakılır (varsa).
  2. Tablo oluşturma: StormEvents tablo oluşturulur.
  3. Eşleme oluşturma: StormEvents_CSV_Mapping eşleme oluşturulur.
  4. Dosya alımı: Bir CSV dosyası (Azure Blob Depolama) alma için kuyruğa alınır.

Aşağıdaki örnek koddan alınmalıdır App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

İpucu

farklı işlem birleşimlerini denemek için içindeki ilgili yöntemleri açıklamayı kaldırın/açıklamayı App.javakaldırın.

Uygulamayı çalıştırma

  1. GitHub'dan örnek kodu kopyalayın:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Hizmet sorumlusu bilgilerini aşağıdaki bilgilerle birlikte program tarafından kullanılan ortam değişkenleri olarak ayarlayın:

    • Küme uç noktası
    • Veritabanı adı
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Derleme ve çalıştırma:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Çıkış şuna benzer olacaktır:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Alma işleminin tamamlanması için birkaç dakika bekleyin. Başarıyla tamamlandıktan sonra şu günlük iletisini görürsünüz: Ingestion completed successfully. Bu noktada programdan çıkıp, zaten kuyruğa alınmış olan alma işlemini etkilemeden sonraki adıma geçebilirsiniz.

Doğrulama

Kuyruğa alınan alımın alım işlemini zamanlaması ve verileri Azure Veri Gezgini'a yüklemesi için beş ile 10 dakika arasında bekleyin.

  1. https://dataexplorer.azure.com adresinde oturum açın ve kümenize bağlanın.

  2. Tablodaki kayıtların StormEvents sayısını almak için aşağıdaki komutu çalıştırın:

    StormEvents | count
    

Sorun giderme

  1. Son dört saat içindeki alma hatalarını görmek için veritabanınızda aşağıdaki komutu çalıştırın:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Son dört saat içindeki tüm alma işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Kaynakları temizleme

Oluşturduğunuz kaynakları kullanmayı planlamıyorsanız, aşağıdaki komutu veritabanınızda çalıştırarak tabloyu bırakın StormEvents .

.drop table StormEvents