Bagikan melalui


integrasi .NET AspireApache Kafka

Meliputi: integrasi hosting dan integrasi Client

Apache Kafka adalah platform streaming acara terdistribusi sumber terbuka. Ini berguna untuk membangun alur data real-time dan aplikasi streaming. Integrasi .NET AspireApache Kafka memungkinkan Anda terhubung ke instans Kafka yang ada, atau membuat instans baru dari .NET dengan gambar kontainer docker.io/confluentinc/confluent-local.

Hosting integrasi

Integrasi hosting Apache Kafka memodelkan server Kafka sebagai jenis KafkaServerResource. Untuk mengakses jenis ini, instal paket NuGet .Hosting.Kafka di proyek host aplikasi , lalu tambahkan dengan menggunakan builder.

dotnet add package Aspire.Hosting.Kafka

Untuk informasi selengkapnya, lihat dotnet menambahkan paket atau Mengelola dependensi paket di aplikasi .NET.

Menambahkan sumber daya server Kafka

Di proyek host aplikasi Anda, panggil AddKafka pada instans builder untuk menambahkan sumber daya server Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Saat .NET.NET Aspire menambahkan gambar kontainer ke host aplikasi, seperti yang ditunjukkan dalam contoh sebelumnya dengan gambar docker.io/confluentinc/confluent-local, gambar tersebut membuat instans server Kafka baru di komputer lokal Anda. Referensi ke server Kafka Anda (variabel kafka) ditambahkan ke ExampleProject. Sumber daya server Kafka mencakup port default

Metode WithReference mengonfigurasi koneksi di ExampleProject bernama "kafka". Untuk informasi selengkapnya, lihat siklus hidup sumber daya Kontainer.

Saran

Jika Anda lebih suka menyambungkan ke server Kafka yang ada, panggil AddConnectionString sebagai gantinya. Untuk informasi selengkapnya, lihat Referensi sumber daya yang ada.

Menambahkan Kafka UI

Untuk menambahkan UI Kafka ke sumber daya server Kafka, panggil metode :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI adalah UI web sumber terbuka gratis untuk memantau dan mengelola kluster Apache Kafka. .NET .NET Aspire menambahkan gambar kontainer lain docker.io/provectuslabs/kafka-ui ke host aplikasi yang menjalankan UI Kafka.

Mengubah host port untuk Kafka UI

Untuk mengubah port host Kafka UI, rantai panggilan ke metode WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI dapat diakses pada http://localhost:9100 dalam contoh sebelumnya.

Tambahkan sumber daya server Kafka dengan volume data

Untuk menambahkan volume data ke sumber daya server Kafka, panggil metode WithDataVolume pada sumber daya server Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Volume data digunakan untuk mempertahankan data server Kafka di luar siklus hidup kontainernya. Volume data dipasang di jalur /var/lib/kafka/data di kontainer server Kafka dan ketika parameter name tidak disediakan, nama dihasilkan secara acak. Untuk informasi lebih lanjut mengenai volume data dan alasan mengapa bind mounts tidak menjadi pilihan utama, lihat dokumen Docker: Volume.

Menambahkan sumber daya server Kafka dengan pemasangan ikatan data

Untuk menambahkan pemasangan ikatan data ke sumber daya server Kafka, panggil metode WithDataBindMount:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Penting

Data bind mounts memiliki fungsionalitas terbatas dibandingkan dengan volumes, yang menawarkan kinerja, portabilitas, dan keamanan yang lebih baik, membuatnya lebih cocok untuk lingkungan produksi. Namun, bind mount memungkinkan akses langsung dan modifikasi file pada sistem host, idealis untuk pengembangan dan pengujian di mana perubahan real-time diperlukan.

Pemasangan bind data mengandalkan sistem file mesin host untuk mempertahankan data server Kafka selama memulai ulang kontainer. Data bind mount dipasang di C:\Kafka\Data pada jalur di sistem Windows (atau /Kafka/Data di Unix) pada mesin host dalam kontainer server Kafka. Untuk informasi selengkapnya tentang pemasangan ikatan data, lihat dokumen Docker: Pemasangan ikatan.

Pengawasan kesehatan integrasi

Integrasi hosting Kafka secara otomatis menambahkan pemeriksaan kesehatan untuk sumber daya server Kafka. Pemeriksaan kesehatan memverifikasi bahwa produsen Kafka dengan nama koneksi yang ditentukan dapat terhubung dan menyimpan topik ke server Kafka.

Integrasi hosting bergantung pada paket 📦 AspNetCore.HealthChecks.Kafka NuGet.

integrasi Client

Untuk mulai menggunakan integrasi .NET AspireApache Kafka, instal paket NuGet 📦Aspire.Confluent.Kafka dalam proyek yang menggunakan klien, yaitu proyek aplikasi yang memanfaatkan klien Apache Kafka.

dotnet add package Aspire.Confluent.Kafka

Tambahkan produser Kafka

Dalam file Program.cs proyek yang menggunakan layanan klien Anda, panggil metode ekstensi AddKafkaProducer untuk mendaftarkan IProducer<TKey, TValue> supaya dapat digunakan melalui container injeksi ketergantungan. Metode ini mengambil dua parameter generik yang sesuai dengan jenis kunci dan jenis pesan untuk dikirim ke broker. Parameter generik ini digunakan oleh AddKafkaProducer untuk membuat instans ProducerBuilder<TKey, TValue>. Metode ini juga mengambil parameter nama koneksi.

builder.AddKafkaProducer<string, string>("messaging");

Anda kemudian bisa mendapatkan instans IProducer<TKey, TValue> menggunakan injeksi dependensi. Misalnya, untuk mengambil produsen dari IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Untuk informasi selengkapnya tentang pekerja, lihat layanan pekerja di .NET.

Tambahkan konsumen Kafka

Untuk mendaftarkan IConsumer<TKey, TValue> agar bisa digunakan melalui kontainer injeksi dependensi, panggil metode ekstensi AddKafkaConsumer di dalam file Program.cs dari proyek klien yang menggunakannya. Metode ini mengambil dua parameter generik yang sesuai dengan jenis kunci dan jenis pesan yang akan diterima dari broker. Parameter generik ini digunakan oleh AddKafkaConsumer untuk membuat instans ConsumerBuilder<TKey, TValue>. Metode ini juga mengambil parameter nama koneksi.

builder.AddKafkaConsumer<string, string>("messaging");

Anda kemudian dapat mendapatkan instans IConsumer<TKey, TValue> menggunakan injeksi dependensi. Misalnya, untuk mengambil data konsumen dari sebuah IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Tambahkan produsen atau konsumen Kafka dengan penggunaan kunci

Mungkin ada situasi di mana Anda ingin mendaftarkan beberapa produsen atau instans konsumen dengan nama koneksi yang berbeda. Untuk mendaftarkan produsen atau konsumen berjenis kunci Kafka, gunakan API yang sesuai.

Untuk informasi selengkapnya tentang layanan ber-key, lihat .NET injeksi dependensi: Layanan ber-key.

Konfigurasi

Integrasi .NET AspireApache Kafka menyediakan beberapa opsi untuk mengonfigurasi koneksi berdasarkan persyaratan dan konvensi proyek Anda.

Menggunakan string koneksi

Saat menggunakan string koneksi dari bagian konfigurasi ConnectionStrings, Anda dapat memberikan nama string koneksi saat memanggil builder.AddKafkaProducer() atau builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Kemudian string koneksi diambil dari bagian konfigurasi ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Nilai string koneksi ditetapkan pada properti BootstrapServers dari instans IProducer<TKey, TValue> atau IConsumer<TKey, TValue> yang dihasilkan. Untuk informasi selengkapnya, lihat BootstrapServers.

Menggunakan penyedia konfigurasi

Integrasi .NET AspireApache Kafka mendukung Microsoft.Extensions.Configuration. Memuat KafkaProducerSettings atau KafkaConsumerSettings dari konfigurasi dengan masing-masing menggunakan kunci Aspire:Confluent:Kafka:Producer dan Aspire.Confluent:Kafka:Consumer. Cuplikan berikut adalah contoh file appsettings.json yang mengonfigurasi beberapa opsi:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Properti Config dari bagian konfigurasi Aspire:Confluent:Kafka:Producer dan Aspire.Confluent:Kafka:Consumer masing-masing berhubungan dengan instans ProducerConfig dan ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> mengharuskan properti ClientId diatur untuk memungkinkan broker melacak offset pesan yang sudah dikonsumsi.

Untuk skema JSON integrasi klien Kafka lengkap, lihat Aspire. Confluent.Kafka/ConfigurationSchema.json.

Gunakan delegasi sebaris

Ada beberapa delegasi inline yang tersedia untuk mengonfigurasi berbagai opsi.

MengonfigurasiKafkaProducerSettings dan KafkaConsumerSettings

Anda dapat mengirimkan parameter Action<KafkaProducerSettings> configureSettings untuk menyiapkan beberapa atau semua opsi langsung, misalnya untuk menonaktifkan cek kesehatan dari kode:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Anda dapat mengonfigurasi sebuah "consumer" secara langsung dalam kode:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Mengonfigurasi ProducerBuilder<TKey, TValue> dan ConsumerBuilder<TKey, TValue>

Untuk mengonfigurasi pembangun Confluent.Kafka, berikan Action<ProducerBuilder<TKey, TValue>> (atau Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Saat mendaftarkan produsen dan konsumen, jika Anda perlu mengakses layanan yang terdaftar dalam kontainer DI, Anda dapat meneruskan Action<IServiceProvider, ProducerBuilder<TKey, TValue>> atau Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> masing-masing:

Pertimbangkan contoh pendaftaran produsen berikut:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Untuk informasi selengkapnya, lihat dokumentasi API ProducerBuilder<TKey, TValue> dan ConsumerBuilder<TKey, TValue>.

Client pemeriksaan kesehatan integrasi

Secara default, integrasi .NET.NET Aspire memungkinkan pemeriksaan kesehatan untuk semua layanan. Untuk informasi selengkapnya, lihat gambaran umum integrasi .NET.NET Aspire.

Integrasi .NET AspireApache Kafka menangani skenario pemeriksaan kesehatan berikut:

  • Menambahkan pengecekan kesehatan Aspire.Confluent.Kafka.Producer ketika KafkaProducerSettings.DisableHealthChecks adalah false.
  • Menambahkan pengecekan kesehatan Aspire.Confluent.Kafka.Consumer ketika KafkaConsumerSettings.DisableHealthChecks adalah false.
  • Terintegrasi dengan titik akhir HTTP /health, yang menentukan semua pemeriksaan kesehatan terdaftar harus lulus agar aplikasi dianggap siap menerima lalu lintas.

Pengamatan dan telemetri

.NET .NET Aspire integrasi secara otomatis menyiapkan konfigurasi Pengelogan, Pelacakan, dan Metrik, yang terkadang dikenal sebagai pilar pengamatan. Untuk informasi selengkapnya tentang pengamatan integrasi dan telemetri, lihat gambaran umum integrasi .NET.NET Aspire. Bergantung pada layanan pendukung, beberapa integrasi mungkin hanya mendukung beberapa fitur ini. Misalnya, beberapa integrasi mendukung pengelogan dan pelacakan, tetapi bukan metrik. Fitur telemetri juga dapat dinonaktifkan menggunakan teknik yang disajikan di bagian Konfigurasi .

Pencatatan

Integrasi .NET AspireApache Kafka menggunakan kategori log berikut:

  • Aspire.Confluent.Kafka

Menelusuri

Integrasi .NET AspireApache Kafka tidak memancarkan jejak terdistribusi.

Metrik

Integrasi .NET AspireApache Kafka memancarkan metrik berikut menggunakan OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received

Lihat juga