次の方法で共有


Orleans のブロードキャスト チャネル

ブロードキャスト チャネルは、すべてのサブスクライバーにメッセージを送信するために使用できる特殊な種類のブロードキャスト メカニズムです。 ストリーミング プロバイダーとは異なり、ブロードキャスト チャネルは永続的ではなく、メッセージを保存せず、永続的なストリームに代わるものではありません。 ブロードキャスト チャネルでは、グレインはブロードキャスト チャネルに暗黙的に登録され、プロデューサーからブロードキャスト メッセージを受信します。 これにより、メッセージの送信者と受信者が切り離されます。これは、送信者と受信者が事前に認識されていないシナリオに役立ちます。

ブロードキャスト チャネルを使用するには、Orleans ストリームを構成してから、サイロ構成時に AddBroadcastChannel を使用してチャネルでのブロードキャストを有効にする必要があります。

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

サンプル シナリオ

ある株価プロバイダーから、株価更新情報を受け取る必要があるグレインがあるシナリオを考えてみましょう。 株価プロバイダーは、株価の更新情報をブロードキャスト チャネルに公開するバックグラウンド サービスです。 グレインは暗黙的にブロードキャスト チャネルに登録され、更新された株価を受け取ります。 次の図は、シナリオを示しています。

シンプルなブロードキャスト チャネル アーキテクチャにおけるサイロ、株グレイン、使用するクライアントを示す株価の図。

前の図で:

  • サイロにより、株価の更新情報がブロードキャスト チャネルに公開されます。
  • グレインでブロードキャスト チャネルが登録され、株価の更新情報が受けとられます。
  • クライアントにより、株価グレインからの株価更新情報が使用されます。

ブロードキャスト チャネルによって、株価更新プログラムのプロデューサーとコンシューマーが切り離されます。 プロデューサーは株価の更新をブロードキャスト チャネルに公開し、コンシューマーはブロードキャスト チャネルを登録して株価の更新を受け取ります。

コンシューマー グレインを定義する

ブロードキャスト チャネル メッセージを使用するには、グレインで IOnBroadcastChannelSubscribed インターフェイスを実装する必要があります。 実装では、IBroadcastChannelSubscription.Attach メソッドを使用してブロードキャスト チャネルにアタッチします。 Attach メソッドは、受信するメッセージの種類のジェネリック型パラメーターを受け取ります。 次の例は、Stock 型のブロードキャスト チャネルを登録するグレインを示しています。

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

上のコードでは以下の操作が行われます。

  • LiveStockGrain グレインにより、IOnBroadcastChannelSubscribed インターフェイスが実装されます。
  • OnSubscribed メソッドは、グレインがブロードキャスト チャネルを登録するときに呼び出されます。
  • subscription パラメーターは、ブロードキャスト チャネルにアタッチする Attach メソッドを呼び出すために使用されます。
    • OnStockUpdated メソッドは、Stock メッセージの受信時に発生するコールバックとして Attach に渡されます。
    • OnError メソッドは、エラーが発生したときに発生するコールバックとして Attach に渡されます。

この例のグレインには、ブロードキャスト チャネルで公開されている最新の株価が含まれます。 このグレインに最新の株価を求めるクライアントは、ブロードキャスト チャネルから最新の株価を取得します。

ブロードキャスト チャネルにメッセージを発行する

ブロードキャスト チャネルにメッセージを発行するには、ブロードキャスト チャネルへの参照を取得する必要があります。 これを行うには、IClusterClient から IBroadcastChannelProvider を取得する必要があります。 プロバイダーを使用すると、IBroadcastChannelProvider.GetChannelWriter メソッドを呼び出して、IBroadcastChannelWriter<T> のインスタンスを取得できます。 ライターは、ブロードキャスト チャネルにメッセージを発行するために使用されます。 次の例は、ブロードキャスト チャネルにメッセージを発行する方法を示しています。

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

上のコードでは以下の操作が行われます。

  • StockWorker クラスは、ブロードキャスト チャネルにメッセージを発行するバックグラウンド サービスです。
  • コンストラクターは、パラメーターとして IStockClientIClusterClient を受け取ります。
  • クラスター クライアント インスタンスから、GetBroadcastChannelProvider メソッドを使用してブロードキャスト チャネル プロバイダーを取得します。
  • IStockClient を使用すると、StockWorker クラスは銘柄記号の最新の株価を取得します。
  • StockWorker クラスは 15 秒ごとに、Stock メッセージをブロードキャスト チャネルに発行します。

ブロードキャスト チャネルへのメッセージの発行は、コンシューマー グレインから切り離されます。 コンシューマー グレインは、ブロードキャスト チャネルを登録し、ブロードキャスト チャネルからメッセージを受信します。 プロデューサーはサイロに存在していて、ブロードキャスト チャネルへのメッセージの発行を担っており、グレインの使用については何も認識していません。

関連項目