Menyerap data menggunakan Kusto Java SDK
Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Pustaka klien Java dapat digunakan untuk menyerap data, mengeluarkan perintah manajemen, dan mengkueri data di kluster Azure Data Explorer.
Dalam artikel ini, pelajari cara menyerap data menggunakan pustaka Azure Data Explorer Java. Pertama, Anda akan membuat tabel dan pemetaan data dalam kluster pengujian. Kemudian Anda akan mengantre penyerapan dari penyimpanan blob ke kluster menggunakan Java SDK dan memvalidasi hasilnya.
Prasyarat
- Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
- Kluster dan database Azure Data Explorer. Membuat kluster dan database.
- Git.
- JDK versi 1.8 atau yang lebih baru.
- Maven.
- Buat Pendaftaran Aplikasi dan berikan izin ke database. Simpan ID klien dan rahasia klien untuk digunakan nanti.
Mengulas kode
Bagian ini bersifat opsional. Tinjau cuplikan kode berikut untuk mempelajari cara kerja kode. Untuk melewati bagian ini, buka menjalankan aplikasi.
Autentikasi
Program ini menggunakan kredensial autentikasi Microsoft Entra dengan ConnectionStringBuilder'.
Buat
com.microsoft.azure.kusto.data.Client
untuk kueri dan manajemen.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Buat dan gunakan untuk mengantrekan
com.microsoft.azure.kusto.ingest.IngestClient
penyerapan data ke Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Perintah manajemen
Perintah manajemen, seperti .drop
dan .create
, dijalankan dengan memanggil execute
com.microsoft.azure.kusto.data.Client
objek.
Misalnya, StormEvents
tabel dibuat sebagai berikut:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Penyerapan data
Penyerapan antrean dengan menggunakan file dari kontainer Azure Blob Storage yang ada.
- Gunakan
BlobSourceInfo
untuk menentukan jalur Blob Storage. - Gunakan
IngestionProperties
untuk menentukan tabel, database, nama pemetaan, dan jenis data. Dalam contoh berikut, jenis datanya adalahCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Proses penyerapan dimulai dalam utas terpisah dan main
utas menunggu utas penyerapan selesai. Proses ini menggunakan CountdownLatch. API penyerapan (IngestClient#ingestFromBlob
) tidak asinkron. Perulangan while
digunakan untuk melakukan polling status saat ini setiap 5 detik dan menunggu status penyerapan berubah dari Pending
ke status yang berbeda. Status akhir dapat berupa Succeeded
, Failed
, atau PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tip
Ada metode lain untuk menangani penyerapan secara asinkron untuk aplikasi yang berbeda. Misalnya, Anda dapat menggunakan CompletableFuture
untuk membuat alur yang menentukan tindakan pasca-penyerapan, seperti mengkueri tabel, atau menangani pengecualian yang dilaporkan ke IngestionStatus
.
Jalankan aplikasi
Umum
Saat Anda menjalankan kode sampel, tindakan berikut dilakukan:
- Jatuhkan tabel:
StormEvents
tabel dihilangkan (jika ada). - Pembuatan tabel:
StormEvents
tabel dibuat. - Pembuatan pemetaan:
StormEvents_CSV_Mapping
pemetaan dibuat. - Penyerapan file: File CSV (di Azure Blob Storage) diantrekan untuk penyerapan.
Contoh kode berikut berasal dari App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tip
Untuk mencoba kombinasi operasi yang berbeda, batalkan komentar/komentari metode masing-masing di App.java
.
Jalankan aplikasi
Kloning kode sampel dari GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Atur informasi perwakilan layanan dengan informasi berikut sebagai variabel lingkungan yang digunakan oleh program:
- Titik akhir kluster
- Nama database
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Bangun dan jalankan:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Outputnya akan mirip dengan:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Tunggu beberapa menit hingga proses penyerapan selesai. Setelah berhasil diselesaikan, Anda akan melihat pesan log berikut: Ingestion completed successfully
. Anda dapat keluar dari program pada saat ini dan pindah ke langkah berikutnya tanpa memengaruhi proses penyerapan, yang telah diantrekan.
Memvalidasi
Tunggu lima hingga 10 menit hingga penyerapan antrean menjadwalkan proses penyerapan dan memuat data ke Azure Data Explorer.
Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda.
Jalankan perintah berikut untuk mendapatkan jumlah rekaman dalam
StormEvents
tabel:StormEvents | count
Pecahkan masalah
Untuk melihat kegagalan penyerapan dalam empat jam terakhir, jalankan perintah berikut pada database Anda:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Untuk melihat status semua operasi penyerapan dalam empat jam terakhir, jalankan perintah berikut:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Membersihkan sumber daya
Jika Anda tidak berencana menggunakan sumber daya yang telah Anda buat, jalankan perintah berikut ini di database Anda untuk menghilangkan StormEvents
tabel.
.drop table StormEvents