Delen via


Gegevens opnemen met behulp van de Kusto Java SDK

Azure Data Explorer is een snelle en zeer schaalbare service voor gegevensverkenning voor telemetrische gegevens en gegevens uit logboeken. De Java-clientbibliotheek kan worden gebruikt om gegevens op te nemen, opdrachten voor probleembeheer op te nemen en query's uit te voeren op gegevens in Azure Data Explorer-clusters.

In dit artikel leert u hoe u gegevens opneemt met behulp van de Azure Data Explorer Java-bibliotheek. Eerst maakt u een tabel en een gegevenstoewijzing in een testcluster. Vervolgens zet u een opname van blobopslag in het cluster in de wachtrij met behulp van de Java SDK en valideert u de resultaten.

Vereisten

De code bekijken

Deze sectie is optioneel. Bekijk de volgende codefragmenten voor meer informatie over hoe de code werkt. Als u deze sectie wilt overslaan, gaat u naar de toepassing uitvoeren.

Verificatie

Het programma maakt gebruik van Microsoft Entra verificatiereferenties met ConnectionStringBuilder'.

  1. Maak een com.microsoft.azure.kusto.data.Client voor query's en beheer.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Maak en gebruik een com.microsoft.azure.kusto.ingest.IngestClient om gegevensopname in de wachtrij te plaatsen in Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Opdrachten voor beheer

Beheeropdrachten, zoals .drop en .create, worden uitgevoerd door aan te roepen execute op een com.microsoft.azure.kusto.data.Client -object.

De tabel wordt bijvoorbeeld StormEvents als volgt gemaakt:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Gegevensopname

Wachtrijopname met behulp van een bestand uit een bestaande Azure Blob Storage container.

  • Gebruik BlobSourceInfo om het blobopslagpad op te geven.
  • Gebruik IngestionProperties om tabel, database, toewijzingsnaam en gegevenstype te definiĆ«ren. In het volgende voorbeeld is CSVhet gegevenstype .
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Het opnameproces begint in een afzonderlijke thread en de main thread wacht tot de opnamethread is voltooid. Dit proces maakt gebruik van CountdownLatch. De opname-API (IngestClient#ingestFromBlob) is niet asynchroon. Een while lus wordt gebruikt om de huidige status elke 5 seconden te peilen en te wachten tot de opnamestatus is gewijzigd van Pending in een andere status. De uiteindelijke status kan , Failedof PartiallySucceededzijnSucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tip

Er zijn andere methoden om opname asynchroon af te handelen voor verschillende toepassingen. U kunt bijvoorbeeld gebruiken CompletableFuture om een pijplijn te maken die de actie na opname definieert, zoals een query uitvoeren op de tabel of uitzonderingen verwerken die zijn gerapporteerd aan de IngestionStatus.

De toepassing uitvoeren

Algemeen

Wanneer u de voorbeeldcode uitvoert, worden de volgende acties uitgevoerd:

  1. Neerzettabel: StormEvents tabel wordt verwijderd (als deze bestaat).
  2. Tabel maken: StormEvents tabel wordt gemaakt.
  3. Toewijzing maken: StormEvents_CSV_Mapping toewijzing wordt gemaakt.
  4. Bestandsopname: een CSV-bestand (in Azure Blob Storage) wordt in de wachtrij geplaatst voor opname.

De volgende voorbeeldcode is van App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tip

Als u verschillende combinaties van bewerkingen wilt proberen, verwijdert u opmerkingen bij de respectieve methoden in App.java.

De toepassing uitvoeren

  1. Kloon de voorbeeldcode uit GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Stel de service-principalgegevens in met de volgende informatie als omgevingsvariabelen die door het programma worden gebruikt:

    • Clustereindpunt
    • Databasenaam
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Bouwen en uitvoeren:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    De uitvoer is vergelijkbaar met:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Wacht enkele minuten totdat het opnameproces is voltooid. Nadat het is voltooid, ziet u het volgende logboekbericht: Ingestion completed successfully. U kunt het programma op dit moment afsluiten en naar de volgende stap gaan zonder dat dit van invloed is op het opnameproces, dat al in de wachtrij is geplaatst.

Valideren

Wacht vijf tot 10 minuten totdat de opname in de wachtrij het opnameproces heeft gepland en gegevens in Azure Data Explorer zijn geladen.

  1. Meld u aan bij https://dataexplorer.azure.com en maak verbinding met uw cluster.

  2. Voer de volgende opdracht uit om het aantal records in de StormEvents tabel op te halen:

    StormEvents | count
    

Problemen oplossen

  1. Als u opnamefouten in de afgelopen vier uur wilt zien, voert u de volgende opdracht uit op uw database:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Voer de volgende opdracht uit om de status van alle opnamebewerkingen in de afgelopen vier uur weer te geven:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Resources opschonen

Als u niet van plan bent om de resources te gebruiken die u hebt gemaakt, voert u de volgende opdracht uit in uw database om de StormEvents tabel te verwijderen.

.drop table StormEvents