Compartir a través de


Inicio rápido de streaming de Orleans

Esta guía le mostrará una manera rápida de configurar y usar secuencias de Orleans. Para más información sobre los detalles de las características de streaming, lea otras partes de esta documentación.

Configuraciones necesarias

En esta guía, se usará una secuencia basada en memoria que usa mensajería específica para enviar datos de flujo a los suscriptores. Se usará el proveedor de almacenamiento en memoria para almacenar listas de suscripciones. El uso de mecanismos basados en memoria para el streaming y el almacenamiento solo está pensado para el desarrollo y pruebas a nivel local, no para entornos de producción.

En el silo, donde silo es un elemento ISiloBuilder, llame a AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

En el cliente del clúster, donde client es un elemento IClientBuilder, llame a AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

En esta guía, usaremos una secuencia sencilla basada en mensajes que usa mensajería específica para enviar datos de flujo a los suscriptores. Usaremos el proveedor de almacenamiento en memoria para almacenar listas de suscripciones, por lo que no es una opción inteligente para las aplicaciones de producción reales.

En el silo, donde hostBuilder es un elemento ISiloHostBuilder, llame a AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

En el cliente del clúster, donde clientBuilder es un elemento IClientBuilder, llame a AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Nota

De forma predeterminada, los mensajes que se pasan sobre la secuencia de mensajes simples se consideran inmutables y pueden pasarse por referencia a otros granos. Para desactivar este comportamiento, debe configurar el proveedor de SMS para desactivarlo SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Puede crear flujos, enviar datos con ellos como productores y también recibir datos como suscriptores.

Generación de eventos

Es relativamente fácil generar eventos para secuencias. Primero debe obtener acceso al proveedor de flujos que ha definido en la configuración anterior ("StreamProvider") y, después, elegir una secuencia e insertar datos en ella.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Es relativamente fácil generar eventos para secuencias. Primero debe obtener acceso al proveedor de flujos que ha definido en la configuración anterior ("SMSProvider") y, después, elegir una secuencia e insertar datos en ella.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Como puede ver, nuestra secuencia tiene un GUID y un espacio de nombres. Esto facilitará la identificación de flujos únicos. Por ejemplo, el espacio de nombres de un salón de chat puede ser "Salas" y el GUID puede ser el GUID de RoomGrain propietario.

Aquí usamos el GUID de algún salón de chat conocido. Con el método OnNextAsync de la secuencia, podemos insertar datos en él. Vamos a hacerlo dentro de un temporizador, usando números aleatorios. También puede usar cualquier otro tipo de datos para la secuencia.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Suscripción a datos de streaming y recepción de estos

Para recibir datos, puede usar suscripciones implícitas y explícitas, que se describen con más detalle en Suscripciones explícitas e implícitas. En este ejemplo se usan suscripciones implícitas, que son más fáciles. Cuando un tipo de grano quiere suscribirse implícitamente a una secuencia, usa el atributo [ImplicitStreamSubscription(espacio de nombres)].

En su caso, debe definir un elemento ReceiverGrain similar al siguiente:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Cada vez que los datos se insertan en las secuencias del espacio de nombres RANDOMDATA, como tenemos en el temporizador, un grano de tipo ReceiverGrain con el mismo Guid de la secuencia recibirá el mensaje. Incluso si no existen activaciones del grano actualmente, el tiempo de ejecución creará automáticamente uno nuevo y le enviará el mensaje.

Para que esto funcione, es necesario completar el proceso de suscripción estableciendo nuestro método OnNextAsync para recibir datos. Para ello, nuestro ReceiverGrain debe llamar a algo parecido a esto en su OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

¡Ya está a punto! Ahora el único requisito es que algo desencadene la creación del grano del productor y, después, registrará el temporizador y empezará a enviar ints aleatorios a todas las partes interesadas.

De nuevo, esta guía omite muchos detalles y solo es bueno para mostrar la imagen general. Lea otras partes de este manual y otros recursos en RX para obtener una buena comprensión de lo que está disponible y cómo.

La programación reactiva puede ser un enfoque muy eficaz para resolver muchos problemas. Por ejemplo, podría usar LINQ en el suscriptor para filtrar números y hacer todo tipo de cosas interesantes.

Consulte también

API de programación de secuencias de Orleans