Mengonfigurasi penyerapan streaming di kluster Azure Data Explorer Anda
Artikel
Konsumsi streaming berguna untuk memuat data ketika Anda membutuhkan latensi rendah antara konsumsi dan kueri. Pertimbangkan untuk menggunakan konsumsi streaming dalam skenario berikut:
Latensi kurang dari satu detik diperlukan.
Untuk mengoptimalkan pemrosesan operasional banyak tabel di mana aliran data ke setiap tabel relatif kecil (beberapa catatan per detik), tetapi volume konsumsi data keseluruhan tinggi (ribuan catatan per detik).
Jika aliran data ke setiap tabel tinggi (lebih dari 4 GB per jam), pertimbangkan untuk menggunakan penyerapan antrean.
Untuk sampel kode berdasarkan versi SDK sebelumnya, lihat artikel yang diarsipkan.
Pilih jenis konsumsi streaming yang sesuai
Dua jenis konsumsi streaming didukung:
Jenis penyerapan
Deskripsi
Koneksi data
Koneksi data Azure Event Hubs, IoT Hub, dan Event Grid dapat menggunakan penyerapan streaming, asalkan diaktifkan pada tingkat kluster. Keputusan untuk menggunakan penyerapan streaming dilakukan sesuai dengan kebijakan penyerapan streaming yang dikonfigurasi pada tabel target. Untuk informasi tentang mengelola koneksi data, lihat Event Hub, IoT Hub, dan Event Grid.
Konsumsi Kustom
Penyerapan kustom mengharuskan Anda menulis aplikasi yang menggunakan salah satu pustaka klien Azure Data Explorer. Gunakan informasi dalam topik ini untuk mengonfigurasi konsumsi kustom. Anda juga dapat menemukan aplikasi sampel penyerapan streaming C?view=azure-data-explorer&preserve-view=true# bermanfaat.
Gunakan tabel berikut untuk membantu Anda memilih jenis konsumsi yang sesuai untuk lingkungan Anda:
Kriteria
Koneksi data
Konsumsi Kustom
Keterlambatan data antara inisiasi konsumsi dan data yang tersedia untuk kueri
Penundaan yang lebih lama
Penundaan yang lebih pendek
Pengembangan overhead
Pengaturan cepat dan mudah, tidak ada overhead pengembangan
Overhead pengembangan tinggi untuk membuat aplikasi menelan data, menangani kesalahan, dan memastikan konsistensi data
Catatan
Anda dapat mengelola proses untuk mengaktifkan dan menonaktifkan penyerapan streaming pada kluster Anda menggunakan portal Azure atau secara terprogram di C#. Jika Anda menggunakan C# untuk aplikasi kustom, Anda mungkin merasa lebih nyaman menggunakan pendekatan terprogram.
Kontributor utama yang dapat mempengaruhi konsumsi streaming adalah:
VM dan ukuran kluster: Performa penyerapan streaming dan skala kapasitas dengan peningkatan ukuran VM dan kluster. Jumlah permintaan konsumsi bersamaan dibatasi hingga enam per inti. Misalnya, untuk 16 SKU inti, seperti D14 dan L16, beban maksimal yang didukung adalah 96 permintaan penyerapan bersamaan. Untuk dua SKU inti, seperti D11, beban maksimal yang didukung adalah 12 permintaan penyerapan bersamaan.
{4}Batas ukuran{2}data: Batas ukuran data untuk permintaan konsumsi streaming adalah 4 MB. Ini termasuk data apa pun yang dibuat untuk kebijakan pembaruan selama penyerapan.
Pembaruan skema:Pembaruan skema, seperti pembuatan dan modifikasi tabel dan pemetaan konsumsi, dapat memakan waktu hingga lima menit untuk layanan konsumsi streaming. Untuk informasi selengkapnya lihat Konsumsi streaming dan perubahan skema.
Kapasitas SSD: Mengaktifkan penyerapan streaming pada kluster, bahkan ketika data tidak diserap melalui streaming, menggunakan bagian dari disk SSD lokal komputer kluster untuk data penyerapan streaming dan mengurangi penyimpanan yang tersedia untuk cache panas.
Mengaktifkan penyerapan streaming pada kluster Anda
Sebelum dapat menggunakan penyerapan streaming, Anda harus mengaktifkan kemampuan pada kluster Anda dan menentukan kebijakan penyerapan streaming. Anda dapat mengaktifkan kemampuan saat membuat kluster, atau menambahkannya ke kluster yang ada.
Peringatan
Tinjau batasan sebelum mengaktifkan konsumsi streaming.
Mengaktifkan penyerapan streaming saat membuat kluster baru
Anda dapat mengaktifkan penyerapan streaming saat membuat kluster baru menggunakan portal Azure atau secara terprogram di C#.
Saat membuat kluster menggunakan langkah-langkah dalam Membuat kluster dan database Azure Data Explorer, di tab Konfigurasi, pilih Streaming penyerapan>Aktif.
Untuk mengaktifkan penyerapan streaming saat membuat kluster Azure Data Explorer baru, jalankan kode berikut:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Mengaktifkan penyerapan streaming pada kluster yang ada
Jika Anda memiliki kluster yang ada, Anda dapat mengaktifkan penyerapan streaming menggunakan portal Azure atau secara terprogram di C#.
Di portal Azure, buka kluster Azure Data Explorer Anda.
Pada Pengaturan, pilih Konfigurasi.
Di panel Konfigurasi, pilih Terus untuk mengaktifkan konsumsi Streaming.
Pilih Simpan.
Anda dapat mengaktifkan penyerapan streaming saat memperbarui kluster Azure Data Explorer yang ada.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Membuat tabel target dan menentukan kebijakan
Buat tabel untuk menerima data penyerapan streaming dan tentukan kebijakan terkait menggunakan portal Azure atau secara terprogram di C#.
Salin salah satu perintah berikut ke panel Kueri dan pilih Jalankan. Ini mendefinisikan kebijakan konsumsi streaming pada tabel yang Anda buat atau pada database yang berisi tabel.
Tip
Kebijakan yang didefinisikan pada tingkat database berlaku untuk semua tabel yang ada dan masa depan dalam database. Saat Anda mengaktifkan kebijakan di tingkat database, tidak perlu mengaktifkannya per tabel.
Untuk menentukan kebijakan pada tabel yang Anda buat, gunakan:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.gzip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Menonaktifkan penyerapan streaming pada kluster Anda
Peringatan
Menonaktifkan konsumsi streaming mungkin memakan waktu beberapa jam.
Sebelum menonaktifkan penyerapan streaming pada kluster Azure Data Explorer Anda, hilangkan kebijakan penyerapan streaming dari semua tabel dan database yang relevan. Penghapusan kebijakan penyerapan streaming memicu penataan ulang data di dalam kluster Azure Data Explorer Anda. Data konsumsi streaming dipindahkan dari penyimpanan awal ke penyimpanan permanen di toko kolom (luas atau pecahan). Proses ini dapat memakan waktu antara beberapa detik hingga beberapa jam, tergantung pada jumlah data dalam penyimpanan awal.
Kebijakan konsumsi streaming
Anda dapat menghilangkan kebijakan penyerapan streaming menggunakan portal Azure atau secara terprogram di C#.
Di portal Azure, buka kluster Azure Data Explorer Anda dan pilih Kueri.
Untuk menjatuhkan kebijakan konsumsi streaming dari tabel, salin perintah berikut ke panel Kueri dan pilih Jalankan.
.delete table TestTable policy streamingingestion
Pada Pengaturan, pilih Konfigurasi.
Di panel Konfigurasi, pilih Terus untuk mengaktifkan konsumsi Streaming.
Pilih Simpan.
Untuk menghilangkan kebijakan penyerapan streaming dari tabel, jalankan kode berikut:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Untuk menonaktifkan penyerapan streaming pada kluster Anda, jalankan kode berikut:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Batasan
Pemetaan data harus dibuat sebelumnya untuk digunakan dalam konsumsi streaming. Permintaan konsumsi streaming individual tidak mengakomodasi pemetaan data inline.
Tag tingkat tidak dapat diatur pada data konsumsi streaming.
Memperbarui kebijakan Kebijakan pembaruan hanya dapat merujuk data yang baru diserap dalam tabel sumber dan bukan data atau tabel lain dalam database.
Ketika kebijakan pembaruan dengan kebijakan transaksi gagal, percobaan ulang akan kembali ke penyerapan batch.
Jika penyerapan streaming diaktifkan pada kluster yang digunakan sebagai pemimpin untuk database pengikut, penyerapan streaming harus diaktifkan pada kluster berikut juga untuk mengikuti data penyerapan streaming. Hal yang sama berlaku apakah data kluster dibagikan melalui Data Share.