Canali di trasmissione in Orleans
I canali di trasmissione sono un tipo speciale di meccanismo di trasmissione che è possibile usare per inviare messaggi a tutti i sottoscrittori. A differenza dei provider di flussi, i canali di trasmissione non sono persistenti e non archiviano i messaggi. Non sostituiscono quindi i flussi persistenti. Per i grani viene eseguita in modo implicito la sottoscrizione dei canali e i grani ricevono quindi i messaggi trasmessi da un producer. Mittente e destinatario del messaggio sono quindi separati e ciò è utile per gli scenari in cui questi non sono noti in anticipo.
Per usare il canale di trasmissione, è necessario configurare flussi Orleans e quindi abilitare la trasmissione nel canale usando AddBroadcastChannel
durante la configurazione del silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Scenario di esempio
Si consideri uno scenario in cui un grano deve ricevere gli aggiornamenti sui prezzi azionari da un provider di informazioni di questo tipo. Il provider è un servizio in background che pubblica gli aggiornamenti dei prezzi azionari in un canale di trasmissione. Per i grani viene eseguita in modo implicito la sottoscrizione del canale di trasmissione e i grani ricevono quindi i prezzi azionari aggiornati. Il diagramma seguente illustra questo scenario:
Nel diagramma precedente:
- Il silo pubblica gli aggiornamenti dei prezzi azionari nel canale di trasmissione.
- Il grano sottoscrive il canale di trasmissione e riceve gli aggiornamenti dei prezzi azionari.
- Il client utilizza gli aggiornamenti dei prezzi azionari dal grano relativo alle azioni.
Il canale di trasmissione separa il producer e il consumer degli aggiornamenti dei prezzi azionari. Il producer pubblica gli aggiornamenti dei prezzi azionari nel canale di trasmissione e il consumer sottoscrive il canale di trasmissione e riceve gli aggiornamenti dei prezzi azionari.
Definire un grano consumer
Per utilizzare i messaggi del canale di trasmissione, il grano deve implementare l'interfaccia IOnBroadcastChannelSubscribed. L'implementazione userà il metodo IBroadcastChannelSubscription.Attach per il collegamento al canale di trasmissione. Il metodo Attach
accetta un parametro di tipo generico per il tipo di messaggio che verrà ricevuto. L'esempio seguente mostra un grano che effettua la sottoscrizione a un canale di trasmissione di tipo Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
Nel codice precedente:
- Il grano
LiveStockGrain
implementa l'interfacciaIOnBroadcastChannelSubscribed
. - Il metodo
OnSubscribed
viene chiamato quando il grano effettua la sottoscrizione al canale di trasmissione. - Il parametro
subscription
viene usato per chiamare il metodoAttach
per il collegamento al canale di trasmissione.- Il metodo
OnStockUpdated
viene passato aAttach
come callback che viene generato quando viene ricevuto il messaggioStock
. - Il metodo
OnError
viene passato aAttach
come callback che viene generato quando si verifica un errore.
- Il metodo
Questo grano di esempio conterrà i prezzi azionari più recenti pubblicati nel canale di trasmissione. Qualsiasi client che chiede a questo grano il prezzo azionario più recente otterrà il prezzo indicato dal canale di trasmissione.
Pubblicare messaggi in un canale di trasmissione
Per pubblicare messaggi nel canale di trasmissione, è necessario ottenere un riferimento al canale. A tale scopo, è necessario ottenere IBroadcastChannelProvider da IClusterClient. Con il provider è possibile chiamare il metodo IBroadcastChannelProvider.GetChannelWriter per ottenere un'istanza di IBroadcastChannelWriter<T>. Il writer viene usato per pubblicare messaggi nel canale di trasmissione. L'esempio seguente mostra come pubblicare messaggi nel canale di trasmissione:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
Nel codice precedente:
- La classe
StockWorker
è un servizio in background che pubblica messaggi nel canale di trasmissione. - Il costruttore accetta
IStockClient
e IClusterClient come parametri. - Dall'istanza client del cluster viene usato il metodo GetBroadcastChannelProvider per ottenere il provider del canale di trasmissione.
- Usando
IStockClient
, la classeStockWorker
ottiene il prezzo aggiornato per un simbolo azionario. - Ogni 15 secondi, la classe
StockWorker
pubblica un messaggioStock
nel canale di trasmissione.
La pubblicazione di messaggi in un canale di trasmissione è separata dal grano consumer. Il grano consumer effettua la sottoscrizione al canale di trasmissione e riceve messaggi dal canale di trasmissione. Il produttore si trova in un silo ed è responsabile della pubblicazione dei messaggi nel canale di trasmissione, ma non conosce i grani consumer.