Adatok betöltése a Kusto .NET SDK használatával
A .NET-hez két ügyfélkódtár érhető el: egy betöltési és egy adattár. A .NET SDK-val kapcsolatos további információkért lásd: .NET SDK. Ezekkel a kódtárakkal adatokat tölthet be egy fürtbe, illetve adatokat kérdezhet le a kódból. Ebben a cikkben először egy táblát és egy adatleképezést hoz létre egy tesztfürtben. Ezután várólistára kell tennie egy betöltési műveletet a fürtnek, és ellenőriznie kell az eredményeket.
Előfeltételek
- Microsoft-fiók vagy Microsoft Entra felhasználói identitás. Nincs szükség Azure-előfizetésre.
- Egy fürt és egy adatbázis. Hozzon létre egy fürtöt és egy adatbázist.
A betöltési kódtár telepítése
Install-Package Microsoft.Azure.Kusto.Ingest
Hitelesítés hozzáadása és kapcsolati karakterlánc létrehozása
Hitelesítés
Egy alkalmazás hitelesítéséhez az SDK a Microsoft Entra bérlőazonosítóját használja. A bérlőazonosító megkereséséhez használja a következő URL-címet úgy, hogy a YourDomain kifejezés helyére a saját tartományát írja be.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Ha például a tartomány a contoso.com, az URL-cím a következő: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Kattintson erre az URL-címre az eredmények megtekintéséhez; az első sor a következő.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
A bérlőazonosító ebben az esetben a következő: 6babcaad-604b-40ac-a9d7-9fd97c0b779f
.
Ez a példa interaktív Microsoft Entra felhasználóhitelesítést használ a fürt eléréséhez. Az Microsoft Entra alkalmazáshitelesítést tanúsítványsal vagy titkos alkalmazáskóddal is használhatja. A kód futtatása előtt mindenképpen állítsa be a és clusterUri
a megfelelő értékekettenantId
.
Az SDK kényelmes módot biztosít a hitelesítési módszer beállítására a kapcsolati karakterlánc részeként. A kapcsolati sztringek teljes dokumentációját lásd: Kapcsolati sztringek.
Megjegyzés
Az SDK jelenlegi verziója nem támogatja az interaktív felhasználói hitelesítést a .NET Core-on. Ha szükséges, használjon inkább Microsoft Entra felhasználónevet/jelszót vagy alkalmazáshitelesítést.
A kapcsolati karakterlánc létrehozása
Most már létrehozhatja a kapcsolati karakterlánc. A céltáblát és a leképezést egy későbbi lépésben fogja létrehozni.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
A forrásfájl adatainak beállítása
Állítsa be a forrásfájl elérési útját. Ez a példa egy Azure Blob Storage-ban üzemeltetett mintafájlt használ. A StormEvents mintaadatkészlet a National Centers for Environment Information időjárással kapcsolatos adatait tartalmazza.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Tábla létrehozása a tesztfürtön
Hozzon létre egy nevű táblát StormEvents
, amely megfelel a fájlban lévő adatok sémájának StormEvents.csv
.
Tipp
Az alábbi kódrészletek szinte minden híváshoz létrehoznak egy ügyfélpéldányt. Ez azért történik, hogy az egyes kódrészletek egyenként futtathatók legyenek. Éles környezetben az ügyfélpéldányok újraküldésre kerülnek, és a szükséges ideig meg kell tartani őket. URI-nként egyetlen ügyfélpéldány elegendő, még akkor is, ha több adatbázissal dolgozik (az adatbázis parancsszinten adható meg).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Adatbetöltési leképezés meghatározása
Képezte le a bejövő CSV-adatokat a tábla létrehozásakor használt oszlopnevekre. Hozzon létre egy CSV-oszlopleképezési objektumot a táblán.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Kötegelési szabályzat definiálása a táblához
A bejövő adatok kötegelése optimalizálja az adat szegmensméretét, amelyet a betöltési kötegelési szabályzat szabályoz. Módosítsa a szabályzatot a betöltési kötegelési házirend kezelése paranccsal. Ezzel a szabályzattal csökkentheti a lassan érkező adatok késését.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Javasoljuk, hogy határozzon meg egy Raw Data Size
értéket a betöltött adatokhoz, és növelje a méretet 250 MB-ra, miközben ellenőrizze, hogy javul-e a teljesítmény.
A tulajdonság használatával kihagyhatja a Flush Immediately
kötegelést, bár ez nem ajánlott a nagy léptékű betöltéshez, mivel ez gyenge teljesítményt okozhat.
Üzenet várólistába helyezése a betöltéshez
Üzenet várólistára helyezése a blobtárolóból származó adatok lekéréséhez és az adatok betöltéséhez. Létrejön egy kapcsolat a betöltési fürttel, és egy másik ügyfél jön létre a végponttal való együttműködéshez.
Tipp
Az alábbi kódrészletek szinte minden híváshoz létrehoznak egy ügyfélpéldányt. Ez azért történik, hogy az egyes kódrészletek egyenként futtathatók legyenek. Éles környezetben az ügyfélpéldányok újraküldésre kerülnek, és a szükséges ideig meg kell tartani őket. URI-nként egyetlen ügyfélpéldány elegendő, még akkor is, ha több adatbázissal dolgozik (az adatbázis parancsszinten adható meg).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Annak ellenőrzése, hogy az adatok betöltése megtörtént-e a táblába
Várjon 5–10 percet, amíg a várólistán lévő betöltési folyamat ütemezi a betöltést, és betölti az adatokat a fürtbe. Ezután futtassa a következő kódot a StormEvents
-táblában lévő rekordok számának lekérdezéséhez.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Hibaelhárítási lekérdezések futtatása
Jelentkezzen be a https://dataexplorer.azure.com oldalon, és csatlakozzon a fürthöz. Futtassa a következő parancsot az adatbázisban annak ellenőrzéséhez, hogy voltak-e betöltési hibák az elmúlt négy órában. A futtatás előtt cserélje le az adatbázis nevét.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Futtassa a következő parancsot az elmúlt négy órában végzett összes betöltési művelet állapotának megtekintéséhez. A futtatás előtt cserélje le az adatbázis nevét.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Az erőforrások eltávolítása
Ha követni szeretné a többi cikkünket, tartsa meg a létrehozott erőforrásokat. Ha nem szeretné, futtassa a következő parancsot az adatbázisban a StormEvents
-tábla felesleges elemeinek eltávolításához.
.drop table StormEvents