Бөлісу құралы:


Orleans Краткое руководство по потоковой передаче

В этом руководстве показано, как быстро настроить и использовать Orleans Потоки. Дополнительные сведения о функциях потоковой передачи см. в других частях этой документации.

Требуемые конфигурации

В этом руководстве вы будете использовать поток на основе памяти, использующий обмен сообщениями с зерном для отправки потоковых данных подписчикам. Поставщик хранилища в памяти будет использоваться для хранения списков подписок. Использование механизмов на основе памяти для потоковой передачи и хранения предназначено только для локальной разработки и тестирования и не предназначено для рабочих сред.

В silo, где silo находится ISiloBuilder, вызов:AddMemoryStreams

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

В клиенте кластера, где client есть IClientBuilderвызов , .AddMemoryStreams

client.AddMemoryStreams("StreamProvider");

В этом руководстве мы будем использовать простой поток на основе сообщений, использующий обмен сообщениями с зерном для отправки потоковых данных подписчикам. Мы будем использовать поставщик хранилища в памяти для хранения списков подписок, поэтому это не мудрый выбор для реальных рабочих приложений.

В silo, где hostBuilder находится ISiloHostBuilder, вызов:AddSimpleMessageStreamProvider

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

В клиенте кластера, где clientBuilder есть IClientBuilderвызов , .AddSimpleMessageStreamProvider

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Примечание.

По умолчанию сообщения, передаваемые по простому потоку сообщений, считаются неизменяемыми и могут передаваться по ссылке на другие зерна. Чтобы отключить это поведение, необходимо настроить поставщик SMS для отключения SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Вы можете создавать потоки, отправлять данные, используя их в качестве производителей, а также получать данные в качестве подписчиков.

Создание событий

Это относительно легко создавать события для потоков. Сначала необходимо получить доступ к поставщику потоков, определенному в конфигурации ранее ("StreamProvider"), а затем выбрать поток и отправить данные в него.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Это относительно легко создавать события для потоков. Сначала необходимо получить доступ к поставщику потоков, определенному в конфигурации ранее ("SMSProvider"), а затем выбрать поток и отправить данные в него.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Как видите, в потоке есть GUID и пространство имен. Это позволит легко определить уникальные потоки. Например, пространство имен для комнаты чата может быть "Комнаты", а GUID может быть идентификатором GUID RoomGrain.

Здесь мы используем GUID некоторых известных комнат чата. OnNextAsync Используя метод потока, мы можем отправить данные в него. Давайте сделаем это внутри таймера, используя случайные числа. Вы также можете использовать любой другой тип данных для потока.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Подписка на данные потоковой передачи и получение

Для получения данных можно использовать неявные и явные подписки, которые подробно описаны в явных и неявных подписках. В этом примере используются неявные подписки, которые проще. Если тип зерна хочет неявно подписаться на поток, он использует атрибут [Неявное пространство_имен)].

В вашем случае определите следующее ReceiverGrain :

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Всякий раз, когда данные передаются в потоки пространства RANDOMDATAимен, как у нас есть в таймере, набор типов ReceiverGrain с тем же Guid потоком будет получать сообщение. Даже если активации зерна в настоящее время отсутствуют, среда выполнения автоматически создаст новую и отправит в нее сообщение.

Для этого необходимо завершить процесс подписки, задав метод OnNextAsync получения данных. Для этого мы ReceiverGrain должны назвать что-то подобное в его OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Все готово! Теперь единственное требование заключается в том, что что-то активирует создание производителя зерна, а затем он зарегистрирует таймер и начнет отправлять случайные инты всем заинтересованным сторонам.

Опять же, это руководство пропускает много деталей и подходит только для отображения большого рисунка. Ознакомьтесь с другими частями этого руководства и других ресурсов rX, чтобы получить хорошее представление о доступных и способах.

Реактивное программирование может быть очень мощным подходом к решению многих проблем. Например, можно использовать LINQ в подписчике для фильтрации номеров и делать все интересные вещи.

См. также

OrleansAPI программирования Потоки