Adatok betöltése a Kusto Java SDK használatával
Az Azure Adatkezelő egy gyors és hatékonyan skálázható adatáttekintési szolgáltatás napló- és telemetriaadatokhoz. A Java-ügyfélkódtár használható adatok betöltésére, problémakezelési parancsokra és adatok lekérdezésére az Azure Data Explorer-fürtökben.
Ebből a cikkből megtudhatja, hogyan betölthet adatokat az Azure Data Explorer Java-kódtár használatával. Először egy táblát és egy adatleképezést fog létrehozni egy tesztfürtben. Ezután a Java SDK használatával várólistára állítja a blobtárolóból a fürtbe történő betöltést, és ellenőrzi az eredményeket.
Előfeltételek
- Microsoft-fiók vagy Microsoft Entra felhasználói identitás. Nincs szükség Azure-előfizetésre.
- Egy Azure-Data Explorer-fürt és -adatbázis. Hozzon létre egy fürtöt és egy adatbázist.
- Git.
- A JDK 1.8-os vagy újabb verziója.
- Maven.
- Hozzon létre egy alkalmazásregisztrációt, és adjon neki engedélyeket az adatbázisnak. Mentse az ügyfélazonosítót és az ügyfél titkos kódját későbbi használatra.
A kód áttekintése
Ez a szakasz nem kötelező. A kód működésének megismeréséhez tekintse át az alábbi kódrészleteket. A szakasz kihagyásához nyissa meg az alkalmazást.
Hitelesítés
A program Microsoft Entra hitelesítési hitelesítő adatokat használ a ConnectionStringBuilder használatával."
Hozzon létre egy lekérdezést
com.microsoft.azure.kusto.data.Client
és felügyeletet.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Hozzon létre és használjon egy
com.microsoft.azure.kusto.ingest.IngestClient
elemet az adatbetöltés várólistára helyezéséhez az Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Felügyeleti parancsok
A felügyeleti parancsok ( például .drop
és .create
) egy objektum meghívásával execute
com.microsoft.azure.kusto.data.Client
lesznek végrehajtva.
A tábla például a StormEvents
következőképpen jön létre:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Adatfeldolgozás
Várólista-betöltés meglévő Azure Blob Storage tárolóból származó fájl használatával.
- A Blob Storage elérési útjának megadására használható
BlobSourceInfo
. - A tábla, az adatbázis, a leképezés neve és az adattípus definiálására használható
IngestionProperties
. Az alábbi példában az adattípus a következőCSV
: .
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
A betöltési folyamat egy külön szálban kezdődik, és a main
szál megvárja, amíg a betöltési szál befejeződik. Ez a folyamat a CountdownLatch függvényt használja. A betöltési API (IngestClient#ingestFromBlob
) nem aszinkron. A while
rendszer 5 másodpercenként lekérdezi az aktuális állapotot, és megvárja, amíg a betöltési állapot egy másik állapotra változik Pending
. A végső állapot lehet Succeeded
, Failed
vagy PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tipp
A különböző alkalmazások aszinkron módon történő betöltésének kezelésére más módszerek is léteznek. Például létrehozhat egy folyamatot, CompletableFuture
amely meghatározza a betöltés utáni műveletet, például lekérdezheti a táblát, vagy kezelheti a IngestionStatus
jelentésben szereplő kivételeket.
Az alkalmazás futtatása
Általános kérdések
A mintakód futtatásakor a következő műveletek lesznek végrehajtva:
-
Tábla elvetése:
StormEvents
a tábla el lesz vetve (ha létezik). -
Tábla létrehozása:
StormEvents
a tábla létrejön. -
Leképezés létrehozása:
StormEvents_CSV_Mapping
a leképezés létrejön. - Fájlbetöltés: Egy CSV-fájl (Azure Blob Storage) várólistára kerül a betöltéshez.
A következő mintakód a következőből származik App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tipp
A műveletek különböző kombinációinak kipróbálásához bontsa ki a megfelelő metódusokat a következőben App.java
: .
Az alkalmazás futtatása
Klónozza a mintakódot a GitHubról:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Állítsa be a szolgáltatásnév adatait a következő információkkal a program által használt környezeti változókként:
- Fürtvégpont
- Adatbázis neve
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Buildelés és futtatás:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
A kimenet a következőhöz hasonló lesz:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Várjon néhány percet, amíg a betöltési folyamat befejeződik. A sikeres befejezés után a következő naplóüzenet jelenik meg: Ingestion completed successfully
. Ezen a ponton kiléphet a programból, és továbbléphet a következő lépésre anélkül, hogy ez hatással lenne a már várólistára helyezett betöltési folyamatra.
Érvényesítés
Várjon 5–10 percet, amíg az üzenetsorba helyezett betöltés ütemezi a betöltési folyamatot, és betölti az adatokat az Azure Data Explorer.
Jelentkezzen be a https://dataexplorer.azure.com oldalon, és csatlakozzon a fürthöz.
Futtassa a következő parancsot a táblában lévő
StormEvents
rekordok számának lekéréséhez:StormEvents | count
Hibaelhárítás
Az elmúlt négy órában előforduló betöltési hibák megtekintéséhez futtassa a következő parancsot az adatbázisban:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Az elmúlt négy órában végrehajtott összes betöltési művelet állapotának megtekintéséhez futtassa a következő parancsot:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Az erőforrások eltávolítása
Ha nem tervezi használni a létrehozott erőforrásokat, futtassa a következő parancsot az adatbázisban a StormEvents
tábla elvetéséhez.
.drop table StormEvents