Condividi tramite


Canali di trasmissione in Orleans

I canali di trasmissione sono un tipo speciale di meccanismo di trasmissione che è possibile usare per inviare messaggi a tutti i sottoscrittori. A differenza dei provider di flussi, i canali di trasmissione non sono persistenti e non archiviano i messaggi. Non sostituiscono quindi i flussi persistenti. Per i grani viene eseguita in modo implicito la sottoscrizione dei canali e i grani ricevono quindi i messaggi trasmessi da un producer. Mittente e destinatario del messaggio sono quindi separati e ciò è utile per gli scenari in cui questi non sono noti in anticipo.

Per usare il canale di trasmissione, è necessario configurare flussi Orleans e quindi abilitare la trasmissione nel canale usando AddBroadcastChannel durante la configurazione del silo.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Scenario di esempio

Si consideri uno scenario in cui un grano deve ricevere gli aggiornamenti sui prezzi azionari da un provider di informazioni di questo tipo. Il provider è un servizio in background che pubblica gli aggiornamenti dei prezzi azionari in un canale di trasmissione. Per i grani viene eseguita in modo implicito la sottoscrizione del canale di trasmissione e i grani ricevono quindi i prezzi azionari aggiornati. Il diagramma seguente illustra questo scenario:

Diagramma dei prezzi azionari che illustra un silo, uno stock grain e un client di consumo in un’architettura di canale di trasmissione semplice.

Nel diagramma precedente:

  • Il silo pubblica gli aggiornamenti dei prezzi azionari nel canale di trasmissione.
  • Il grano sottoscrive il canale di trasmissione e riceve gli aggiornamenti dei prezzi azionari.
  • Il client utilizza gli aggiornamenti dei prezzi azionari dal grano relativo alle azioni.

Il canale di trasmissione separa il producer e il consumer degli aggiornamenti dei prezzi azionari. Il producer pubblica gli aggiornamenti dei prezzi azionari nel canale di trasmissione e il consumer sottoscrive il canale di trasmissione e riceve gli aggiornamenti dei prezzi azionari.

Definire un grano consumer

Per utilizzare i messaggi del canale di trasmissione, il grano deve implementare l'interfaccia IOnBroadcastChannelSubscribed. L'implementazione userà il metodo IBroadcastChannelSubscription.Attach per il collegamento al canale di trasmissione. Il metodo Attach accetta un parametro di tipo generico per il tipo di messaggio che verrà ricevuto. L'esempio seguente mostra un grano che effettua la sottoscrizione a un canale di trasmissione di tipo Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

Nel codice precedente:

  • Il grano LiveStockGrain implementa l'interfaccia IOnBroadcastChannelSubscribed.
  • Il metodo OnSubscribed viene chiamato quando il grano effettua la sottoscrizione al canale di trasmissione.
  • Il parametro subscription viene usato per chiamare il metodo Attach per il collegamento al canale di trasmissione.
    • Il metodo OnStockUpdated viene passato a Attach come callback che viene generato quando viene ricevuto il messaggio Stock.
    • Il metodo OnError viene passato a Attach come callback che viene generato quando si verifica un errore.

Questo grano di esempio conterrà i prezzi azionari più recenti pubblicati nel canale di trasmissione. Qualsiasi client che chiede a questo grano il prezzo azionario più recente otterrà il prezzo indicato dal canale di trasmissione.

Pubblicare messaggi in un canale di trasmissione

Per pubblicare messaggi nel canale di trasmissione, è necessario ottenere un riferimento al canale. A tale scopo, è necessario ottenere IBroadcastChannelProvider da IClusterClient. Con il provider è possibile chiamare il metodo IBroadcastChannelProvider.GetChannelWriter per ottenere un'istanza di IBroadcastChannelWriter<T>. Il writer viene usato per pubblicare messaggi nel canale di trasmissione. L'esempio seguente mostra come pubblicare messaggi nel canale di trasmissione:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

Nel codice precedente:

  • La classe StockWorker è un servizio in background che pubblica messaggi nel canale di trasmissione.
  • Il costruttore accetta IStockClient e IClusterClient come parametri.
  • Dall'istanza client del cluster viene usato il metodo GetBroadcastChannelProvider per ottenere il provider del canale di trasmissione.
  • Usando IStockClient, la classe StockWorker ottiene il prezzo aggiornato per un simbolo azionario.
  • Ogni 15 secondi, la classe StockWorker pubblica un messaggio Stock nel canale di trasmissione.

La pubblicazione di messaggi in un canale di trasmissione è separata dal grano consumer. Il grano consumer effettua la sottoscrizione al canale di trasmissione e riceve messaggi dal canale di trasmissione. Il produttore si trova in un silo ed è responsabile della pubblicazione dei messaggi nel canale di trasmissione, ma non conosce i grani consumer.

Vedi anche