Широковещательные каналы в Orleans
Широковещательные каналы — это особый тип механизма вещания, который можно использовать для отправки сообщений всем подписчикам. В отличие от поставщиков потоковой передачи, широковещательные каналы не являются постоянными и не хранят сообщения, и они не являются заменой постоянных потоков. С широковещательными каналами зерна неявно подписаны на канал вещания и получают широковещательные сообщения от производителя. Это отделяет отправителя и получателя сообщения, что полезно для сценариев, когда отправитель и получатель не известны заранее.
Чтобы использовать широковещательный канал, необходимо настроить Orleans Streams и включить трансляцию в канале с помощью AddBroadcastChannel
конфигурации silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Пример сценария
Рассмотрим сценарий, в котором у вас есть зерно, которое должно получать обновления цен акций от поставщика цен на акции. Поставщик цен акций — это фоновая служба, которая публикует обновления цен на акции в канале трансляции. Зерна неявно подписаны на канал трансляции и получают обновленные цены на акции. На следующей схеме показан сценарий:
На предыдущей схеме показано следующее.
- Silo публикует обновления цен на акции в канале трансляции.
- Зерно подписывается на канал трансляции и получает обновления цен на акции.
- Клиент потребляет обновления цен на акции из акций зерна.
Широковещательный канал отделяет производителя и потребителя обновлений цен на акции. Производитель публикует обновления цен акций на канал трансляции, а потребитель подписывается на канал трансляции и получает обновления цен на акции.
Определение потребительского зерна
Чтобы использовать широковещательные сообщения канала, необходимо реализовать IOnBroadcastChannelSubscribed интерфейс. Реализация будет использовать IBroadcastChannelSubscription.Attach метод для подключения к каналу трансляции. Метод Attach
принимает параметр универсального типа для типа сообщения, который вы собираетесь получить. В следующем примере показано зерно, которое подписывается на широковещательный канал типа Stock
:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
В предыдущем коде:
- Зерно
LiveStockGrain
реализуетIOnBroadcastChannelSubscribed
интерфейс. - Метод
OnSubscribed
вызывается, когда зерно подписывается на канал трансляции. - Параметр
subscription
используется для вызоваAttach
метода для подключения к каналу трансляции.- Метод
OnStockUpdated
передаетсяAttach
в качестве обратного вызова, который запускается приStock
получении сообщения. - Метод
OnError
передаетсяAttach
в качестве обратного вызова, который запускается при возникновении ошибки.
- Метод
В этом примере зерна будут содержаться последние цены на акции, опубликованные на канале трансляции. Любой клиент, который просит это зерно за последнюю цену акций, получит последнюю цену из канала трансляции.
Публикация сообщений в канале трансляции
Чтобы опубликовать сообщения в канале трансляции, необходимо получить ссылку на канал трансляции. Для этого необходимо получить IBroadcastChannelProvider от .IClusterClient С помощью поставщика можно вызвать IBroadcastChannelProvider.GetChannelWriter метод для получения экземпляра IBroadcastChannelWriter<T>. Модуль записи используется для публикации сообщений в канале трансляции. В следующем примере показано, как публиковать сообщения в канале трансляции:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
В предыдущем коде:
- Класс
StockWorker
— это фоновая служба, которая публикует сообщения в канале трансляции. - Конструктор принимает и IClusterClient в
IStockClient
качестве параметров. - Из экземпляра GetBroadcastChannelProvider клиента кластера метод используется для получения поставщика широковещательного канала.
IStockClient
Используя класс,StockWorker
получает последнюю цену акций для символа акций.- Каждые 15 секунд
StockWorker
класс публикуетStock
сообщение в канале трансляции.
Публикация сообщений в широковещательном канале отделяется от потребительского зерна. Потребитель зерна подписывается на канал вещания и получает сообщения из канала вещания. Продюсер живет в силосе и отвечает за публикацию сообщений в канале вещания и не знает ничего о потреблении зерна.