Dela via


Orleans snabbstart för direktuppspelning

Den här guiden visar ett snabbt sätt att konfigurera och använda Orleans Flöden. Mer information om strömningsfunktionerna finns i andra delar av den här dokumentationen.

Nödvändiga konfigurationer

I den här guiden använder du en minnesbaserad ström som använder kornmeddelanden för att skicka dataström till prenumeranter. Du kommer att använda lagringsprovidern i minnet för att lagra listor över prenumerationer. Användning av minnesbaserade mekanismer för strömning och lagring är endast avsett för lokal utveckling och testning och är inte avsett för produktionsmiljöer.

På silon, där silo är ett ISiloBuilder, -samtal AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

På klusterklienten, där client är ett IClientBuilder, -anrop AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

I den här guiden använder vi en enkel meddelandebaserad ström som använder kornmeddelanden för att skicka dataström till prenumeranter. Vi använder lagringsprovidern i minnet för att lagra listor över prenumerationer, så det är inte ett klokt val för verkliga produktionsprogram.

På silon, där hostBuilder är ett ISiloHostBuilder, -samtal AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

På klusterklienten, där clientBuilder är ett IClientBuilder, -anrop AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Kommentar

Som standard anses meddelanden som skickas via Den enkla meddelandeströmmen vara oföränderliga och kan skickas med referens till andra korn. Om du vill inaktivera det här beteendet måste du konfigurera SMS-providern för att stänga av SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Du kan skapa strömmar, skicka data med dem som producenter och även ta emot data som prenumeranter.

Skapa händelser

Det är relativt enkelt att skapa händelser för strömmar. Du bör först få åtkomst till den dataströmprovider som du definierade i konfigurationen tidigare ("StreamProvider") och sedan välja en ström och skicka data till den.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Det är relativt enkelt att skapa händelser för strömmar. Du bör först få åtkomst till den dataströmprovider som du definierade i konfigurationen tidigare ("SMSProvider") och sedan välja en ström och skicka data till den.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Som du ser har vår ström ett GUID och ett namnområde. Detta gör det enkelt att identifiera unika strömmar. Namnområdet för ett chattrum kan till exempel vara "Rum" och GUID kan vara den ägande RoomGrains GUID.

Här använder vi GUID för ett känt chattrum. OnNextAsync Med hjälp av dataströmmens metod kan vi skicka data till den. Vi gör det i en timer med hjälp av slumpmässiga tal. Du kan också använda andra datatyper för dataströmmen.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Prenumerera på och ta emot strömmande data

För att ta emot data kan du använda implicita och explicita prenumerationer, som beskrivs mer detaljerat i explicita och implicita prenumerationer. I det här exemplet används implicita prenumerationer, vilket är enklare. När en korntyp implicit vill prenumerera på en dataström använder den attributet [ImplicitStreamSubscription(namespace)].

För ditt fall definierar du en ReceiverGrain så här:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

När data skickas till strömmarna i namnområdet RANDOMDATA, som vi har i timern, får ett korn av typ ReceiverGrain med samma Guid dataström meddelandet. Även om det för närvarande inte finns några aktiveringar av kornet skapar körningen automatiskt en ny och skickar meddelandet till den.

För att detta ska fungera måste vi slutföra prenumerationsprocessen genom att ange vår OnNextAsync metod för att ta emot data. För att göra det bör vi ReceiverGrain kalla något liknande i sin OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Nu är det klart! Nu är det enda kravet att något utlöser producentkornets skapande, och sedan registrerar det timern och börjar skicka slumpmässiga ints till alla berörda parter.

Återigen hoppar den här guiden över massor av detaljer och är bara bra för att visa helheten. Läs andra delar av den här handboken och andra resurser på RX för att få en god förståelse för vad som är tillgängligt och hur.

Reaktiv programmering kan vara en mycket kraftfull metod för att lösa många problem. Du kan till exempel använda LINQ i prenumeranten för att filtrera nummer och göra alla möjliga intressanta saker.

Se även

OrleansFlöden programmerings-API:er