Bagikan melalui


Menyiarkan saluran di Orleans

Saluran siaran adalah jenis mekanisme penyiaran khusus yang dapat digunakan untuk mengirim pesan ke semua pelanggan. Tidak seperti penyedia streaming, saluran siaran tidak persisten dan tidak menyimpan pesan, dan mereka bukan pengganti aliran persisten. Dengan saluran siaran, biji-bijian secara implisit berlangganan saluran siaran dan menerima pesan siaran dari produser. Ini memisahkan pengirim dan penerima pesan, yang berguna untuk skenario di mana pengirim dan penerima tidak diketahui sebelumnya.

Untuk menggunakan saluran siaran, Anda harus mengonfigurasi Orleans Streams lalu mengaktifkan siaran di saluran Anda menggunakan AddBroadcastChannel selama konfigurasi silo.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Contoh skenario

Pertimbangkan skenario di mana Anda memiliki butiran yang perlu menerima pembaruan harga saham dari penyedia harga saham. Penyedia harga saham adalah layanan latar belakang yang menerbitkan pembaruan harga saham ke saluran siaran. Butiran secara implisit berlangganan saluran siaran dan menerima harga saham yang diperbarui. Diagram berikut menunjukkan skenario:

Diagram harga saham yang menggambarkan silo, biji-bijian stok, dan mengonsumsi klien dalam arsitektur saluran siaran sederhana.

Pada diagram sebelumnya:

  • Silo menerbitkan pembaruan harga saham ke saluran siaran.
  • Biji-bijian berlangganan saluran siaran dan menerima pembaruan harga saham.
  • Klien mengonsumsi pembaruan harga saham dari butiran stok.

Saluran siaran memisahkan produsen dan konsumen pembaruan harga saham. Produsen menerbitkan pembaruan harga saham ke saluran siaran, dan konsumen berlangganan saluran siaran dan menerima pembaruan harga saham.

Menentukan butir konsumen

Untuk menggunakan pesan saluran siaran, biji-bijian Anda perlu mengimplementasikan IOnBroadcastChannelSubscribed antarmuka. Implementasi Anda akan menggunakan IBroadcastChannelSubscription.Attach metode untuk melampirkan ke saluran siaran. Metode ini Attach mengambil parameter jenis generik untuk jenis pesan yang akan Anda terima. Contoh berikut menunjukkan biji-bijian yang berlangganan saluran siaran jenis Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

Dalam kode sebelumnya:

  • Butir LiveStockGrain mengimplementasikan IOnBroadcastChannelSubscribed antarmuka.
  • Metode OnSubscribed ini dipanggil ketika biji-bijian berlangganan saluran siaran.
  • Parameter subscription digunakan untuk memanggil Attach metode untuk melampirkan ke saluran siaran.
    • Metode OnStockUpdated ini diteruskan sebagai Attach panggilan balik yang diaktifkan ketika Stock pesan diterima.
    • Metode OnError ini diteruskan ke Attach sebagai panggilan balik yang diaktifkan ketika kesalahan terjadi.

Contoh butir ini akan berisi harga saham terbaru seperti yang dipublikasikan di saluran siaran. Setiap klien yang meminta biji-bijian ini untuk harga saham terbaru akan mendapatkan harga terbaru dari saluran siaran.

Menerbitkan pesan ke saluran siaran

Untuk menerbitkan pesan ke saluran siaran, Anda perlu mendapatkan referensi ke saluran siaran. Untuk melakukan ini, Anda perlu mendapatkan IBroadcastChannelProvider dari IClusterClient. Dengan penyedia, Anda dapat memanggil IBroadcastChannelProvider.GetChannelWriter metode untuk mendapatkan instans .IBroadcastChannelWriter<T> Penulis digunakan untuk menerbitkan pesan ke saluran siaran. Contoh berikut menunjukkan cara menerbitkan pesan ke saluran siaran:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

Dalam kode sebelumnya:

  • Kelas StockWorker adalah layanan latar belakang yang menerbitkan pesan ke saluran siaran.
  • Konstruktor mengambil IStockClient parameter dan IClusterClient sebagai .
  • Dari instans klien kluster, GetBroadcastChannelProvider metode ini digunakan untuk mendapatkan penyedia saluran siaran.
  • IStockClientMenggunakan , StockWorker kelas mendapatkan harga saham terbaru untuk simbol saham.
  • Setiap 15 detik, StockWorker kelas menerbitkan Stock pesan ke saluran siaran.

Penerbitan pesan ke saluran siaran dipisahkan dari butir konsumen. Butir konsumen berlangganan saluran siaran dan menerima pesan dari saluran siaran. Produser tinggal di silo dan bertanggung jawab untuk menerbitkan pesan ke saluran siaran dan tidak tahu apa-apa tentang mengkonsumsi biji-bijian.

Lihat juga