Gegevens opnemen met behulp van de Kusto Java SDK
Azure Data Explorer is een snelle en zeer schaalbare service voor gegevensverkenning voor telemetrische gegevens en gegevens uit logboeken. De Java-clientbibliotheek kan worden gebruikt om gegevens op te nemen, opdrachten voor probleembeheer op te nemen en query's uit te voeren op gegevens in Azure Data Explorer-clusters.
In dit artikel leert u hoe u gegevens opneemt met behulp van de Azure Data Explorer Java-bibliotheek. Eerst maakt u een tabel en een gegevenstoewijzing in een testcluster. Vervolgens zet u een opname van blobopslag in het cluster in de wachtrij met behulp van de Java SDK en valideert u de resultaten.
Vereisten
- Een Microsoft-account of een Microsoft Entra gebruikersidentiteit. Een Azure-abonnement is niet vereist.
- Een Azure Data Explorer-cluster en -database. Maak een cluster en database.
- Git.
- JDK versie 1.8 of hoger.
- Maven.
- Maak een app-registratie en ververleent deze machtigingen voor de database. Sla de client-id en het clientgeheim op voor later gebruik.
De code bekijken
Deze sectie is optioneel. Bekijk de volgende codefragmenten voor meer informatie over hoe de code werkt. Als u deze sectie wilt overslaan, gaat u naar de toepassing uitvoeren.
Verificatie
Het programma maakt gebruik van Microsoft Entra verificatiereferenties met ConnectionStringBuilder'.
Maak een
com.microsoft.azure.kusto.data.Client
voor query's en beheer.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Maak en gebruik een
com.microsoft.azure.kusto.ingest.IngestClient
om gegevensopname in de wachtrij te plaatsen in Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Opdrachten voor beheer
Beheeropdrachten, zoals .drop
en .create
, worden uitgevoerd door aan te roepen execute
op een com.microsoft.azure.kusto.data.Client
-object.
De tabel wordt bijvoorbeeld StormEvents
als volgt gemaakt:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Gegevensopname
Wachtrijopname met behulp van een bestand uit een bestaande Azure Blob Storage container.
- Gebruik
BlobSourceInfo
om het blobopslagpad op te geven. - Gebruik
IngestionProperties
om tabel, database, toewijzingsnaam en gegevenstype te definiƫren. In het volgende voorbeeld isCSV
het gegevenstype .
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Het opnameproces begint in een afzonderlijke thread en de main
thread wacht tot de opnamethread is voltooid. Dit proces maakt gebruik van CountdownLatch. De opname-API (IngestClient#ingestFromBlob
) is niet asynchroon. Een while
lus wordt gebruikt om de huidige status elke 5 seconden te peilen en te wachten tot de opnamestatus is gewijzigd van Pending
in een andere status. De uiteindelijke status kan , Failed
of PartiallySucceeded
zijnSucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tip
Er zijn andere methoden om opname asynchroon af te handelen voor verschillende toepassingen. U kunt bijvoorbeeld gebruiken CompletableFuture
om een pijplijn te maken die de actie na opname definieert, zoals een query uitvoeren op de tabel of uitzonderingen verwerken die zijn gerapporteerd aan de IngestionStatus
.
De toepassing uitvoeren
Algemeen
Wanneer u de voorbeeldcode uitvoert, worden de volgende acties uitgevoerd:
-
Neerzettabel:
StormEvents
tabel wordt verwijderd (als deze bestaat). -
Tabel maken:
StormEvents
tabel wordt gemaakt. -
Toewijzing maken:
StormEvents_CSV_Mapping
toewijzing wordt gemaakt. - Bestandsopname: een CSV-bestand (in Azure Blob Storage) wordt in de wachtrij geplaatst voor opname.
De volgende voorbeeldcode is van App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tip
Als u verschillende combinaties van bewerkingen wilt proberen, verwijdert u opmerkingen bij de respectieve methoden in App.java
.
De toepassing uitvoeren
Kloon de voorbeeldcode uit GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Stel de service-principalgegevens in met de volgende informatie als omgevingsvariabelen die door het programma worden gebruikt:
- Clustereindpunt
- Databasenaam
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Bouwen en uitvoeren:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
De uitvoer is vergelijkbaar met:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Wacht enkele minuten totdat het opnameproces is voltooid. Nadat het is voltooid, ziet u het volgende logboekbericht: Ingestion completed successfully
. U kunt het programma op dit moment afsluiten en naar de volgende stap gaan zonder dat dit van invloed is op het opnameproces, dat al in de wachtrij is geplaatst.
Valideren
Wacht vijf tot 10 minuten totdat de opname in de wachtrij het opnameproces heeft gepland en gegevens in Azure Data Explorer zijn geladen.
Meld u aan bij https://dataexplorer.azure.com en maak verbinding met uw cluster.
Voer de volgende opdracht uit om het aantal records in de
StormEvents
tabel op te halen:StormEvents | count
Problemen oplossen
Als u opnamefouten in de afgelopen vier uur wilt zien, voert u de volgende opdracht uit op uw database:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Voer de volgende opdracht uit om de status van alle opnamebewerkingen in de afgelopen vier uur weer te geven:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Resources opschonen
Als u niet van plan bent om de resources te gebruiken die u hebt gemaakt, voert u de volgende opdracht uit in uw database om de StormEvents
tabel te verwijderen.
.drop table StormEvents