Orleans Краткое руководство по потоковой передаче
В этом руководстве показано, как быстро настроить и использовать Orleans Потоки. Дополнительные сведения о функциях потоковой передачи см. в других частях этой документации.
Требуемые конфигурации
В этом руководстве вы будете использовать поток на основе памяти, использующий обмен сообщениями с зерном для отправки потоковых данных подписчикам. Поставщик хранилища в памяти будет использоваться для хранения списков подписок. Использование механизмов на основе памяти для потоковой передачи и хранения предназначено только для локальной разработки и тестирования и не предназначено для рабочих сред.
В silo, где silo
находится ISiloBuilder, вызов:AddMemoryStreams
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
В клиенте кластера, где client
есть IClientBuilderвызов , .AddMemoryStreams
client.AddMemoryStreams("StreamProvider");
В этом руководстве мы будем использовать простой поток на основе сообщений, использующий обмен сообщениями с зерном для отправки потоковых данных подписчикам. Мы будем использовать поставщик хранилища в памяти для хранения списков подписок, поэтому это не мудрый выбор для реальных рабочих приложений.
В silo, где hostBuilder
находится ISiloHostBuilder
, вызов:AddSimpleMessageStreamProvider
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
В клиенте кластера, где clientBuilder
есть IClientBuilder
вызов , .AddSimpleMessageStreamProvider
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Примечание.
По умолчанию сообщения, передаваемые по простому потоку сообщений, считаются неизменяемыми и могут передаваться по ссылке на другие зерна. Чтобы отключить это поведение, необходимо настроить поставщик SMS для отключения SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Вы можете создавать потоки, отправлять данные, используя их в качестве производителей, а также получать данные в качестве подписчиков.
Создание событий
Это относительно легко создавать события для потоков. Сначала необходимо получить доступ к поставщику потоков, определенному в конфигурации ранее ("StreamProvider"
), а затем выбрать поток и отправить данные в него.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Это относительно легко создавать события для потоков. Сначала необходимо получить доступ к поставщику потоков, определенному в конфигурации ранее ("SMSProvider"
), а затем выбрать поток и отправить данные в него.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Как видите, в потоке есть GUID и пространство имен. Это позволит легко определить уникальные потоки. Например, пространство имен для комнаты чата может быть "Комнаты", а GUID может быть идентификатором GUID RoomGrain.
Здесь мы используем GUID некоторых известных комнат чата. OnNextAsync
Используя метод потока, мы можем отправить данные в него. Давайте сделаем это внутри таймера, используя случайные числа. Вы также можете использовать любой другой тип данных для потока.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Подписка на данные потоковой передачи и получение
Для получения данных можно использовать неявные и явные подписки, которые подробно описаны в явных и неявных подписках. В этом примере используются неявные подписки, которые проще. Если тип зерна хочет неявно подписаться на поток, он использует атрибут [Неявное пространство_имен)].
В вашем случае определите следующее ReceiverGrain
:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Всякий раз, когда данные передаются в потоки пространства RANDOMDATA
имен, как у нас есть в таймере, набор типов ReceiverGrain
с тем же Guid
потоком будет получать сообщение. Даже если активации зерна в настоящее время отсутствуют, среда выполнения автоматически создаст новую и отправит в нее сообщение.
Для этого необходимо завершить процесс подписки, задав метод OnNextAsync
получения данных. Для этого мы ReceiverGrain
должны назвать что-то подобное в его OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Все готово! Теперь единственное требование заключается в том, что что-то активирует создание производителя зерна, а затем он зарегистрирует таймер и начнет отправлять случайные инты всем заинтересованным сторонам.
Опять же, это руководство пропускает много деталей и подходит только для отображения большого рисунка. Ознакомьтесь с другими частями этого руководства и других ресурсов rX, чтобы получить хорошее представление о доступных и способах.
Реактивное программирование может быть очень мощным подходом к решению многих проблем. Например, можно использовать LINQ в подписчике для фильтрации номеров и делать все интересные вещи.