Inicio rápido de streaming de Orleans
Esta guía le mostrará una manera rápida de configurar y usar secuencias de Orleans. Para más información sobre los detalles de las características de streaming, lea otras partes de esta documentación.
Configuraciones necesarias
En esta guía, se usará una secuencia basada en memoria que usa mensajería específica para enviar datos de flujo a los suscriptores. Se usará el proveedor de almacenamiento en memoria para almacenar listas de suscripciones. El uso de mecanismos basados en memoria para el streaming y el almacenamiento solo está pensado para el desarrollo y pruebas a nivel local, no para entornos de producción.
En el silo, donde silo
es un elemento ISiloBuilder, llame a AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
En el cliente del clúster, donde client
es un elemento IClientBuilder, llame a AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
En esta guía, usaremos una secuencia sencilla basada en mensajes que usa mensajería específica para enviar datos de flujo a los suscriptores. Usaremos el proveedor de almacenamiento en memoria para almacenar listas de suscripciones, por lo que no es una opción inteligente para las aplicaciones de producción reales.
En el silo, donde hostBuilder
es un elemento ISiloHostBuilder
, llame a AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
En el cliente del clúster, donde clientBuilder
es un elemento IClientBuilder
, llame a AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Nota
De forma predeterminada, los mensajes que se pasan sobre la secuencia de mensajes simples se consideran inmutables y pueden pasarse por referencia a otros granos. Para desactivar este comportamiento, debe configurar el proveedor de SMS para desactivarlo SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Puede crear flujos, enviar datos con ellos como productores y también recibir datos como suscriptores.
Generación de eventos
Es relativamente fácil generar eventos para secuencias. Primero debe obtener acceso al proveedor de flujos que ha definido en la configuración anterior ("StreamProvider"
) y, después, elegir una secuencia e insertar datos en ella.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Es relativamente fácil generar eventos para secuencias. Primero debe obtener acceso al proveedor de flujos que ha definido en la configuración anterior ("SMSProvider"
) y, después, elegir una secuencia e insertar datos en ella.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Como puede ver, nuestra secuencia tiene un GUID y un espacio de nombres. Esto facilitará la identificación de flujos únicos. Por ejemplo, el espacio de nombres de un salón de chat puede ser "Salas" y el GUID puede ser el GUID de RoomGrain propietario.
Aquí usamos el GUID de algún salón de chat conocido. Con el método OnNextAsync
de la secuencia, podemos insertar datos en él. Vamos a hacerlo dentro de un temporizador, usando números aleatorios. También puede usar cualquier otro tipo de datos para la secuencia.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Suscripción a datos de streaming y recepción de estos
Para recibir datos, puede usar suscripciones implícitas y explícitas, que se describen con más detalle en Suscripciones explícitas e implícitas. En este ejemplo se usan suscripciones implícitas, que son más fáciles. Cuando un tipo de grano quiere suscribirse implícitamente a una secuencia, usa el atributo [ImplicitStreamSubscription(espacio de nombres)].
En su caso, debe definir un elemento ReceiverGrain
similar al siguiente:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Cada vez que los datos se insertan en las secuencias del espacio de nombres RANDOMDATA
, como tenemos en el temporizador, un grano de tipo ReceiverGrain
con el mismo Guid
de la secuencia recibirá el mensaje. Incluso si no existen activaciones del grano actualmente, el tiempo de ejecución creará automáticamente uno nuevo y le enviará el mensaje.
Para que esto funcione, es necesario completar el proceso de suscripción estableciendo nuestro método OnNextAsync
para recibir datos. Para ello, nuestro ReceiverGrain
debe llamar a algo parecido a esto en su OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
¡Ya está a punto! Ahora el único requisito es que algo desencadene la creación del grano del productor y, después, registrará el temporizador y empezará a enviar ints aleatorios a todas las partes interesadas.
De nuevo, esta guía omite muchos detalles y solo es bueno para mostrar la imagen general. Lea otras partes de este manual y otros recursos en RX para obtener una buena comprensión de lo que está disponible y cómo.
La programación reactiva puede ser un enfoque muy eficaz para resolver muchos problemas. Por ejemplo, podría usar LINQ en el suscriptor para filtrar números y hacer todo tipo de cosas interesantes.