Aracılığıyla paylaş


.NET Aspire Apache Kafka entegrasyonu

Içerir: Barındırma tümleştirmesi ve Client tümleştirmesi

Apache Kafka, açık kaynaklı bir dağıtılmış olay akış platformudur. Gerçek zamanlı veri işlem hatları ve akış uygulamaları oluşturmak için kullanışlıdır. .NET Aspire Apache Kafka tümleştirmesi, mevcut Kafka örneklerine bağlanmanıza veya .NETile docker.io/confluentinc/confluent-local'dan yeni örnekler oluşturmanıza olanak tanır.

Barındırma entegrasyonu

Apache Kafka barındırma entegrasyonu, Kafka sunucusunu KafkaServerResource türü olarak modeller. Bu türe erişmek için, 📦Aspireve Hosting.Kafka NuGet paketini uygulama konağı projesine yükleyin; ardından oluşturucu ile ekleyin.

dotnet add package Aspire.Hosting.Kafka

Daha fazla bilgi için bkz. dotnet add package veya .NET uygulamalarında paket bağımlılıklarını yönetme.

Kafka sunucu kaynağı ekleme

Uygulama barındırıcı projenizde, Kafka sunucu kaynağı eklemek için AddKafka örneğinde builder fonksiyonunu çağırın.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

.NET .NET Aspire, docker.io/confluentinc/confluent-local görüntüsüyle önceki örnekte gösterildiği gibi uygulama konağına bir kapsayıcı görüntüsü eklediğinde, yerel makinenizde yeni bir Kafka sunucu örneği oluşturur. Kafka sunucunuza bir başvuru olarak (kafka değişkeni) ExampleProject eklenir. Kafka sunucu kaynağı varsayılan bağlantı noktalarını içerir

WithReference yöntemi, adı ExampleProjectolan "kafka"'de bir bağlantıyı yapılandırıyor. Daha fazla bilgi için bkz. Kapsayıcı kaynak yaşam döngüsü.

Bahşiş

Mevcut bir Kafka sunucusuna bağlanmayı tercih ederseniz bunun yerine AddConnectionString çağırın. Daha fazla bilgi için bkz. Var olan kaynaklara başvurma.

Kafka kullanıcı arabirimi ekleme

kafka sunucu kaynağına Kafka kullanıcı arabirimi eklemek için WithKafkaUI yöntemini çağırın:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka kullanıcı arabirimi, Apache Kafka kümelerini izlemek ve yönetmek için ücretsiz, açık kaynak bir web kullanıcı arabirimidir. .NET .NET Aspire, Kafka kullanıcı arabirimini çalıştıran uygulama konağına başka bir kapsayıcı görüntüsü docker.io/provectuslabs/kafka-ui ekler.

Kafka arayüzü sunucu bağlantı noktasını değiştirin

Kafka UI ana bilgisayar portunu değiştirmek için WithHostPort yöntemine bir çağrı zincirleme adımını izleyin.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka kullanıcı arabirimine, önceki örnekte http://localhost:9100 konumundan erişilebilir.

Veri hacmi ile Kafka sunucu kaynağı ekle

Kafka sunucu kaynağına veri birimi eklemek için Kafka sunucu kaynağında WithDataVolume yöntemini çağırın:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Veri birimi, Kafka sunucu verilerini kapsayıcısının yaşam döngüsü dışında kalıcı hale getirmek için kullanılır. Veri birimi Kafka sunucu kapsayıcısında /var/lib/kafka/data yoluna bağlanır ve name parametresi sağlanmazsa ad rastgele oluşturulur. Veri hacimleri ve bağlamalar üzerine neden tercih edildiğine ilişkin daha fazla bilgi için bkz. Docker dokümanlar: Hacimler.

Veri bağlama montajı ile Kafka sunucu kaynağı ekleyin

Kafka sunucu kaynağına veri bağlaması eklemek için WithDataBindMount yöntemini çağırın:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Önemli

Veri bağlama bağlamaları, daha iyi performans, taşınabilirlik ve güvenlik sunanbirimleriyle karşılaştırıldığında sınırlı işlevselliğe sahiptir ve bu da üretim ortamları için daha uygun olmasını sağlar. Ancak bind mount'lar, gerçek zamanlı değişikliklerin gerektiği geliştirme ve test için ideal olan konak sistemindeki dosyalara doğrudan erişim ve değişiklik yapma olanağı sağlar.

Veri bağlama bağlantıları, Kafka sunucu verilerinin kalıcılığını sağlamak amacıyla kapsayıcı yeniden başlatmaları arasında konak makinenin dosya sistemini kullanır. Veri bağlama noktası, Kafka sunucu kapsayıcısında ana makinedeki Windows C:\Kafka\Data yolu (veya /Kafka/Dataüzerinde Unix) üzerine monte edilir. Veri bağlama montajları hakkında daha fazla bilgi için Docker belgelerine bakın: Bağlama montajları.

Barındırma tümleştirme durumu denetimleri

Kafka barındırma tümleştirmesi, Kafka sunucu kaynağı için otomatik olarak bir sistem durumu denetimi ekler. Sağlık kontrolü, belirtilen bağlantı adına sahip bir Kafka üreticisinin Kafka sunucusuna bağlanabildiğini ve bir konuyu kalıcı hale getirebildiğini doğrular.

Barındırma tümleştirmesi, 📦 AspNetCore.HealthChecks.Kafka NuGet paketine dayanır.

Client entegrasyonu

.NET Aspire Apache Kafka tümleştirmesini kullanmaya başlamak için, Apache Kafka istemcisini kullanan uygulamanın projesinde, yani istemciyi kullanan projede, 📦Aspire.Confluent.Kafka NuGet paketini yükleyin.

dotnet add package Aspire.Confluent.Kafka

Kafka üreticisi ekleme

İstemci kullanan projenizin Program.cs dosyasında, bağımlılık enjeksiyonu kapsayıcısı aracılığıyla kullanmak üzere bir AddKafkaProducer kaydetmek için IProducer<TKey, TValue> uzantı yöntemini çağırın. yöntemi, anahtarın türüne ve aracıya gönderilecek iletinin türüne karşılık gelen iki genel parametre alır. Bu genel parametreler AddKafkaProducer tarafından ProducerBuilder<TKey, TValue>örneği oluşturmak için kullanılır. Bu yöntem ayrıca bağlantı adı parametresini alır.

builder.AddKafkaProducer<string, string>("messaging");

Daha sonra bağımlılık ekleme kullanarak IProducer<TKey, TValue> örneğini alabilirsiniz. Örneğin, üreticiye bir IHostedService'dan erişmek için:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Çalışanlar hakkında daha fazla bilgi için bkz. Çalışan hizmetleri.

Kafka tüketicisi ekleme

Bağımlılık ekleme kapsayıcısı aracılığıyla kullanmak üzere bir IConsumer<TKey, TValue> kaydetmek için, istemcinin kullandığı projedeki AddKafkaConsumer dosyasında yer alan Program.cs uzantı yöntemini çağırın. Yöntem, anahtar türüne ve aracıdan alınacak mesaj türüne karşılık gelen iki adet genel parametreyi kullanır. Bu genel parametreler AddKafkaConsumer tarafından ConsumerBuilder<TKey, TValue>örneği oluşturmak için kullanılır. Bu yöntem ayrıca bağlantı adı parametresini alır.

builder.AddKafkaConsumer<string, string>("messaging");

Daha sonra bağımlılık ekleme kullanarak IConsumer<TKey, TValue> örneğini alabilirsiniz. Örneğin, tüketiciyi bir IHostedService'den geri almak için:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Anahtarlı Kafka üreticilerini veya tüketicilerini ekleyin

Farklı bağlantı adlarına sahip birden çok üretici veya tüketici örneğini kaydetmek istediğiniz durumlar olabilir. Anahtarlı Kafka üreticilerini veya tüketicilerini kaydetmek için uygun API'yi çağırın:

Anahtarlı hizmetler hakkında daha fazla bilgi için bkz. .NET bağımlılık ekleme: Anahtarlı hizmetler.

Konfigürasyon

.NET Aspire Apache Kafka tümleştirmesi, projenizin gereksinimlerine ve kurallarına göre bağlantıyı yapılandırmak için birden çok seçenek sağlar.

Bağlantı dizesi kullanma

ConnectionStrings yapılandırma bölümünden bir bağlantı dizesi kullanırken, builder.AddKafkaProducer() veya builder.AddKafkaProducer()çağırırken bağlantı dizesinin adını sağlayabilirsiniz:

builder.AddKafkaProducer<string, string>("kafka-producer");

Ardından bağlantı dizesi ConnectionStrings yapılandırma bölümünden alınır:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Bağlantı dizesi değeri, üretilen BootstrapServers veya IProducer<TKey, TValue> örneğinin IConsumer<TKey, TValue> özelliğine ayarlanır. Daha fazla bilgi için bkz. BootstrapServers.

Yapılandırma sağlayıcılarını kullanma

.NET Aspire Apache Kafka tümleştirmesi Microsoft.Extensions.Configurationdesteklemektedir. KafkaProducerSettings ve KafkaConsumerSettings anahtarlarını sırasıyla kullanarak yapılandırmadan Aspire:Confluent:Kafka:Producer veya Aspire.Confluent:Kafka:Consumer'i yükler. Aşağıdaki kod parçacığı, bazı seçenekleri yapılandıran bir appsettings.json dosyası örneğidir:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Hem Config hem de Aspire:Confluent:Kafka:Producer yapılandırma bölümlerinin Aspire.Confluent:Kafka:Consumer özellikleri sırasıyla ProducerConfig ve ConsumerConfigörneklerine bağlanır.

Confluent.Kafka.Consumer<TKey, TValue>, komisyoncunun tüketilen mesaj ofsetlerini izlemesine izin vermek için ClientId özelliğinin ayarlanmasını gerektirir.

Kafka istemci tümleştirmesi JSON şemasının tamamı için bkz. Aspire. Confluent.Kafka/ConfigurationSchema.json.

Satır içi temsilcileri kullanın

Çeşitli seçenekleri yapılandırmak için kullanılabilecek birkaç satır içi temsilci vardır.

KafkaProducerSettings ve KafkaConsumerSettings yapılandırma

Action<KafkaProducerSettings> configureSettings temsilcisini geçirerek satır içi seçeneklerin bazılarını veya tümünü ayarlayabilirsiniz; örneğin, koddan sistem durumu denetimlerini devre dışı bırakmak için:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Koddan satır içi bir tüketici yapılandırabilirsiniz.

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue> ve ConsumerBuilder<TKey, TValue>'i yapılandır

Confluent.Kafka oluşturucularını yapılandırmak için bir Action<ProducerBuilder<TKey, TValue>> (ya da Action<ConsumerBuilder<TKey, TValue>>) geçirin.

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Üreticileri ve tüketicileri kaydederken, DI kapsayıcısında kayıtlı bir hizmete erişmeniz gerekiyorsa sırasıyla bir Action<IServiceProvider, ProducerBuilder<TKey, TValue>> veya Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> geçirebilirsiniz:

Aşağıdaki üretici kayıt örneğini göz önünde bulundurun:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Daha fazla bilgi için ProducerBuilder<TKey, TValue> ve ConsumerBuilder<TKey, TValue> API belgelerine bakın.

entegrasyon sağlık kontrolleri Client

Varsayılan olarak, .NET.NET Aspire tümleştirmeleri tüm hizmetler için sağlık kontrollerini etkinleştirir. Daha fazla bilgi için bkz. .NET.NET Aspire tümleştirmelere genel bakış.

.NET Aspire Apache Kafka tümleştirmesi aşağıdaki sağlık kontrolü senaryolarını ele alır:

  • KafkaProducerSettings.DisableHealthChecks false olduğunda Aspire.Confluent.Kafka.Producer sağlık kontrolünü ekler.
  • KafkaConsumerSettings.DisableHealthChecks false olduğunda Aspire.Confluent.Kafka.Consumer sağlık kontrolü eklenir.
  • Uygulamanın trafiği kabul etmeye hazır olarak kabul edilmesi için tüm kayıtlı sistem durumu denetimlerinin geçmesi gerektiğini belirten /health HTTP uç noktasıyla tümleşir.

Gözlemlenebilirlik ve telemetri

.NET .NET Aspire entegrasyonları, bazen gözlemlenebilirliğin temel unsurları olarak bilinen Loglama, İzleme ve Metrik yapılandırmalarını otomatik olarak ayarlar. Tümleştirme gözlemlenebilirliği ve telemetri hakkında daha fazla bilgi için bkz. tümleştirmelere genel bakış. Yedekleme hizmetine bağlı olarak, bazı tümleştirmeler bu özelliklerden yalnızca bazılarını destekleyemeyebilir. Örneğin, bazı entegrasyonlar günlüğe kaydetme ve izlemeyi destekler, ancak ölçümleri desteklemezler. Telemetri özellikleri, Yapılandırma bölümünde sunulan teknikler kullanılarak da devre dışı bırakılabilir.

Kayıt tutma

.NET Aspire Apache Kafka tümleştirmesi aşağıdaki günlük kategorilerini kullanır:

  • Aspire.Confluent.Kafka

Takip Etme

.NET Aspire Apache Kafka tümleştirmesi dağıtılmış izleri yaymaz.

Ölçüm

.NET Aspire Apache Kafka tümleştirmesi, OpenTelemetrykullanarak aşağıdaki ölçümleri yayar:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received

Ayrıca bkz.