integrasi .NET AspireApache Kafka
Meliputi: integrasi hosting dan integrasi
Client
Apache Kafka adalah platform streaming acara terdistribusi sumber terbuka. Ini berguna untuk membangun alur data real-time dan aplikasi streaming. Integrasi .NET AspireApache Kafka memungkinkan Anda terhubung ke instans Kafka yang ada, atau membuat instans baru dari .NET dengan gambar kontainer docker.io/confluentinc/confluent-local
.
Hosting integrasi
Integrasi hosting Apache Kafka memodelkan server Kafka sebagai jenis KafkaServerResource. Untuk mengakses jenis ini, instal paket NuGet
dotnet add package Aspire.Hosting.Kafka
Untuk informasi selengkapnya, lihat dotnet menambahkan paket atau Mengelola dependensi paket di aplikasi .NET.
Menambahkan sumber daya server Kafka
Di proyek host aplikasi Anda, panggil AddKafka pada instans builder
untuk menambahkan sumber daya server Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Saat .NET.NET Aspire menambahkan gambar kontainer ke host aplikasi, seperti yang ditunjukkan dalam contoh sebelumnya dengan gambar docker.io/confluentinc/confluent-local
, gambar tersebut membuat instans server Kafka baru di komputer lokal Anda. Referensi ke server Kafka Anda (variabel kafka
) ditambahkan ke ExampleProject
. Sumber daya server Kafka mencakup port default
Metode WithReference mengonfigurasi koneksi di ExampleProject
bernama "kafka"
. Untuk informasi selengkapnya, lihat siklus hidup sumber daya Kontainer.
Saran
Jika Anda lebih suka menyambungkan ke server Kafka yang ada, panggil AddConnectionString sebagai gantinya. Untuk informasi selengkapnya, lihat Referensi sumber daya yang ada.
Menambahkan Kafka UI
Untuk menambahkan UI Kafka
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI adalah UI web sumber terbuka gratis untuk memantau dan mengelola kluster Apache Kafka.
.NET
.NET Aspire menambahkan gambar kontainer lain docker.io/provectuslabs/kafka-ui
ke host aplikasi yang menjalankan UI Kafka.
Mengubah host port untuk Kafka UI
Untuk mengubah port host Kafka UI, rantai panggilan ke metode WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI dapat diakses pada http://localhost:9100
dalam contoh sebelumnya.
Tambahkan sumber daya server Kafka dengan volume data
Untuk menambahkan volume data ke sumber daya server Kafka, panggil metode WithDataVolume pada sumber daya server Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Volume data digunakan untuk mempertahankan data server Kafka di luar siklus hidup kontainernya. Volume data dipasang di jalur /var/lib/kafka/data
di kontainer server Kafka dan ketika parameter name
tidak disediakan, nama dihasilkan secara acak. Untuk informasi lebih lanjut mengenai volume data dan alasan mengapa bind mounts tidak menjadi pilihan utama, lihat dokumen Docker: Volume.
Menambahkan sumber daya server Kafka dengan pemasangan ikatan data
Untuk menambahkan pemasangan ikatan data ke sumber daya server Kafka, panggil metode WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Penting
Data bind mounts memiliki fungsionalitas terbatas dibandingkan dengan volumes, yang menawarkan kinerja, portabilitas, dan keamanan yang lebih baik, membuatnya lebih cocok untuk lingkungan produksi. Namun, bind mount memungkinkan akses langsung dan modifikasi file pada sistem host, idealis untuk pengembangan dan pengujian di mana perubahan real-time diperlukan.
Pemasangan bind data mengandalkan sistem file mesin host untuk mempertahankan data server Kafka selama memulai ulang kontainer. Data bind mount dipasang di C:\Kafka\Data
pada jalur di sistem Windows (atau /Kafka/Data
di Unix) pada mesin host dalam kontainer server Kafka. Untuk informasi selengkapnya tentang pemasangan ikatan data, lihat dokumen Docker: Pemasangan ikatan.
Pengawasan kesehatan integrasi
Integrasi hosting Kafka secara otomatis menambahkan pemeriksaan kesehatan untuk sumber daya server Kafka. Pemeriksaan kesehatan memverifikasi bahwa produsen Kafka dengan nama koneksi yang ditentukan dapat terhubung dan menyimpan topik ke server Kafka.
Integrasi hosting bergantung pada paket 📦 AspNetCore.HealthChecks.Kafka NuGet.
integrasi Client
Untuk mulai menggunakan integrasi .NET AspireApache Kafka, instal paket NuGet 📦Aspire.Confluent.Kafka dalam proyek yang menggunakan klien, yaitu proyek aplikasi yang memanfaatkan klien Apache Kafka.
dotnet add package Aspire.Confluent.Kafka
Tambahkan produser Kafka
Dalam file Program.cs proyek yang menggunakan layanan klien Anda, panggil metode ekstensi AddKafkaProducer untuk mendaftarkan IProducer<TKey, TValue>
supaya dapat digunakan melalui container injeksi ketergantungan. Metode ini mengambil dua parameter generik yang sesuai dengan jenis kunci dan jenis pesan untuk dikirim ke broker. Parameter generik ini digunakan oleh AddKafkaProducer
untuk membuat instans ProducerBuilder<TKey, TValue>
. Metode ini juga mengambil parameter nama koneksi.
builder.AddKafkaProducer<string, string>("messaging");
Anda kemudian bisa mendapatkan instans IProducer<TKey, TValue>
menggunakan injeksi dependensi. Misalnya, untuk mengambil produsen dari IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Untuk informasi selengkapnya tentang pekerja, lihat layanan pekerja di .NET.
Tambahkan konsumen Kafka
Untuk mendaftarkan IConsumer<TKey, TValue>
agar bisa digunakan melalui kontainer injeksi dependensi, panggil metode ekstensi AddKafkaConsumer di dalam file Program.cs dari proyek klien yang menggunakannya. Metode ini mengambil dua parameter generik yang sesuai dengan jenis kunci dan jenis pesan yang akan diterima dari broker. Parameter generik ini digunakan oleh AddKafkaConsumer
untuk membuat instans ConsumerBuilder<TKey, TValue>
. Metode ini juga mengambil parameter nama koneksi.
builder.AddKafkaConsumer<string, string>("messaging");
Anda kemudian dapat mendapatkan instans IConsumer<TKey, TValue>
menggunakan injeksi dependensi. Misalnya, untuk mengambil data konsumen dari sebuah IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Tambahkan produsen atau konsumen Kafka dengan penggunaan kunci
Mungkin ada situasi di mana Anda ingin mendaftarkan beberapa produsen atau instans konsumen dengan nama koneksi yang berbeda. Untuk mendaftarkan produsen atau konsumen berjenis kunci Kafka, gunakan API yang sesuai.
- AddKeyedKafkaProducer: Mendaftarkan produser Kafka yang diberi kunci.
- AddKeyedKafkaConsumer: Mendaftarkan konsumen Kafka berbasis kunci.
Untuk informasi selengkapnya tentang layanan ber-key, lihat .NET injeksi dependensi: Layanan ber-key.
Konfigurasi
Integrasi .NET AspireApache Kafka menyediakan beberapa opsi untuk mengonfigurasi koneksi berdasarkan persyaratan dan konvensi proyek Anda.
Menggunakan string koneksi
Saat menggunakan string koneksi dari bagian konfigurasi ConnectionStrings
, Anda dapat memberikan nama string koneksi saat memanggil builder.AddKafkaProducer()
atau builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Kemudian string koneksi diambil dari bagian konfigurasi ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Nilai string koneksi ditetapkan pada properti BootstrapServers
dari instans IProducer<TKey, TValue>
atau IConsumer<TKey, TValue>
yang dihasilkan. Untuk informasi selengkapnya, lihat BootstrapServers.
Menggunakan penyedia konfigurasi
Integrasi .NET AspireApache Kafka mendukung Microsoft.Extensions.Configuration. Memuat KafkaProducerSettings atau KafkaConsumerSettings dari konfigurasi dengan masing-masing menggunakan kunci Aspire:Confluent:Kafka:Producer
dan Aspire.Confluent:Kafka:Consumer
. Cuplikan berikut adalah contoh file appsettings.json yang mengonfigurasi beberapa opsi:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Properti Config
dari bagian konfigurasi Aspire:Confluent:Kafka:Producer
dan Aspire.Confluent:Kafka:Consumer
masing-masing berhubungan dengan instans ProducerConfig
dan ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
mengharuskan properti ClientId
diatur untuk memungkinkan broker melacak offset pesan yang sudah dikonsumsi.
Untuk skema JSON integrasi klien Kafka lengkap, lihat Aspire. Confluent.Kafka/ConfigurationSchema.json.
Gunakan delegasi sebaris
Ada beberapa delegasi inline yang tersedia untuk mengonfigurasi berbagai opsi.
MengonfigurasiKafkaProducerSettings
dan KafkaConsumerSettings
Anda dapat mengirimkan parameter Action<KafkaProducerSettings> configureSettings
untuk menyiapkan beberapa atau semua opsi langsung, misalnya untuk menonaktifkan cek kesehatan dari kode:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Anda dapat mengonfigurasi sebuah "consumer" secara langsung dalam kode:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Mengonfigurasi ProducerBuilder<TKey, TValue>
dan ConsumerBuilder<TKey, TValue>
Untuk mengonfigurasi pembangun Confluent.Kafka
, berikan Action<ProducerBuilder<TKey, TValue>>
(atau Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Saat mendaftarkan produsen dan konsumen, jika Anda perlu mengakses layanan yang terdaftar dalam kontainer DI, Anda dapat meneruskan Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
atau Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
masing-masing:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Pertimbangkan contoh pendaftaran produsen berikut:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Untuk informasi selengkapnya, lihat dokumentasi API ProducerBuilder<TKey, TValue>
dan ConsumerBuilder<TKey, TValue>
.
Client pemeriksaan kesehatan integrasi
Secara default, integrasi .NET.NET Aspire memungkinkan pemeriksaan kesehatan untuk semua layanan. Untuk informasi selengkapnya, lihat gambaran umum integrasi .NET.NET Aspire.
Integrasi .NET AspireApache Kafka menangani skenario pemeriksaan kesehatan berikut:
- Menambahkan pengecekan kesehatan
Aspire.Confluent.Kafka.Producer
ketika KafkaProducerSettings.DisableHealthChecks adalahfalse
. - Menambahkan pengecekan kesehatan
Aspire.Confluent.Kafka.Consumer
ketika KafkaConsumerSettings.DisableHealthChecks adalahfalse
. - Terintegrasi dengan titik akhir HTTP
/health
, yang menentukan semua pemeriksaan kesehatan terdaftar harus lulus agar aplikasi dianggap siap menerima lalu lintas.
Pengamatan dan telemetri
.NET
.NET Aspire integrasi secara otomatis menyiapkan konfigurasi Pengelogan, Pelacakan, dan Metrik, yang terkadang dikenal sebagai pilar pengamatan. Untuk informasi selengkapnya tentang pengamatan integrasi dan telemetri, lihat gambaran umum integrasi .NET.NET Aspire. Bergantung pada layanan pendukung, beberapa integrasi mungkin hanya mendukung beberapa fitur ini. Misalnya, beberapa integrasi mendukung pengelogan dan pelacakan, tetapi bukan metrik. Fitur telemetri juga dapat dinonaktifkan menggunakan teknik yang disajikan di bagian Konfigurasi
Pencatatan
Integrasi .NET AspireApache Kafka menggunakan kategori log berikut:
Aspire.Confluent.Kafka
Menelusuri
Integrasi .NET AspireApache Kafka tidak memancarkan jejak terdistribusi.
Metrik
Integrasi .NET AspireApache Kafka memancarkan metrik berikut menggunakan OpenTelemetry:
Aspire.Confluent.Kafka
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
Lihat juga
.NET Aspire