Delen via


Orleans streaming-API's

Toepassingen communiceren met streams via API's die vergelijkbaar zijn met de bekende reactieve extensies (Rx) in .NET. Het belangrijkste verschil is dat Orleans stroomextensies asynchroon zijn, om de verwerking efficiënter te maken in Orleansgedistribueerde en schaalbare rekeninfrastructuur.

Asynchrone stream

Een toepassing begint met het gebruik van een streamprovider om een ingang naar een stream te krijgen. U kunt hier meer lezen over streamproviders, maar voor nu kunt u het beschouwen als een streamfactory waarmee implementers het gedrag en de semantiek van stromen kunnen aanpassen:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Een toepassing kan een verwijzing naar de streamprovider ophalen door de Grain.GetStreamProvider methode aan te roepen in een korrel of door de GrainClient.GetStreamProvider methode aan te roepen op de client.

Orleans.Streams.IAsyncStream<T> is een logische, sterk getypte ingang naar een virtuele stream. Het is vergelijkbaar met Orleans Grain Reference. Oproepen naar GetStreamProvider en GetStream zijn puur lokaal. De argumenten die moeten worden gebruikt GetStream , zijn een GUID en een extra tekenreeks die we een stroomnaamruimte noemen (die null kan zijn). Samen vormen de GUID en de naamruimtetekenreeks de stroomidentiteit (vergelijkbaar met de argumenten).IGrainFactory.GetGrain De combinatie van GUID en naamruimtetekenreeks biedt extra flexibiliteit bij het bepalen van stroomidentiteiten. Net als graan 7 kan bestaan binnen het type PlayerGrain Grain en een ander korrel 7 binnen het graantype ChatRoomGrainbestaat, kan Stream 123 bestaan met de stroomnaamruimte PlayerEventsStream en kan er een andere stream 123 bestaan binnen de stroomnaamruimte ChatRoomMessagesStream.

Produceren en gebruiken

IAsyncStream<T> implementeert zowel de IAsyncObserver<T> als IAsyncObservable<T> de interfaces. Op die manier kan een toepassing de stream gebruiken om nieuwe gebeurtenissen in de stream te produceren met behulp van Orleans.Streams.IAsyncObserver<T> of om u te abonneren op gebeurtenissen uit een stream en deze te gebruiken met behulp van Orleans.Streams.IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Als u gebeurtenissen in de stream wilt produceren, roept een toepassing alleen aan

await stream.OnNextAsync<T>(event)

Als u zich wilt abonneren op een stream, roept een toepassing aan

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

Het argument dat moet SubscribeAsync worden gebruikt, kan een object zijn waarmee de IAsyncObserver<T> interface of een combinatie van lambda-functies wordt geïmplementeerd om binnenkomende gebeurtenissen te verwerken. Er zijn meer opties SubscribeAsync beschikbaar via AsyncObservableExtensions klasse. SubscribeAsync retourneert een StreamSubscriptionHandle<T>, wat een ondoorzichtige handgreep is die kan worden gebruikt om zich af te melden voor de stream (vergelijkbaar met een asynchrone versie van IDisposable).

await subscriptionHandle.UnsubscribeAsync()

Het is belangrijk om te weten dat het abonnement een korrel heeft, niet voor activering. Zodra de graancode is geabonneerd op de stream, overschrijdt dit abonnement de levensduur van deze activering en blijft deze duurzaam totdat de graancode (mogelijk in een andere activering) expliciet wordt afgemeld. Dit is het hart van een abstractie van een virtuele stroom: niet alleen bestaan alle streams altijd, logisch, maar ook een streamabonnement is duurzaam en leeft buiten een bepaalde fysieke activering die het abonnement heeft gemaakt.

Multipliciteit

Een Orleans stroom kan meerdere producenten en meerdere consumenten hebben. Een bericht dat door een producent wordt gepubliceerd, wordt bezorgd bij alle consumenten die zijn geabonneerd op de stream voordat het bericht werd gepubliceerd.

Bovendien kan de consument zich meerdere keren abonneren op dezelfde stream. Telkens wanneer het zich abonneert, krijgt het een unieke StreamSubscriptionHandle<T>. Als een graan (of client) X keer is geabonneerd op dezelfde stream, ontvangt deze dezelfde gebeurtenis X keer, één keer voor elk abonnement. De consument kan zich ook afmelden voor een afzonderlijk abonnement. U kunt alle huidige abonnementen vinden door het volgende aan te roepen:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Herstellen van fouten

Als de producent van een stroom sterft (of zijn graan wordt gedeactiveerd), hoeft hij niets te doen. De volgende keer dat dit graan meer gebeurtenissen wil produceren, kan de stroom opnieuw worden verwerkt en nieuwe gebeurtenissen op dezelfde manier produceren.

Consumentenlogica is iets meer betrokken. Zoals we eerder hebben gezegd, is dit abonnement geldig zodra een consumentenkorrel is geabonneerd op een stream, totdat het graan expliciet wordt afgemeld. Als de consument van de stroom sterft (of het graan ervan gedeactiveerd) en er een nieuwe gebeurtenis wordt gegenereerd in de stroom, wordt het verbruiksinterval automatisch opnieuw geactiveerd (net zoals elke reguliere Orleans korrel wordt geactiveerd wanneer er een bericht naartoe wordt verzonden). Het enige wat de graancode nu moet doen, is door IAsyncObserver<T> de gegevens te verwerken. De consument moet verwerkingslogica opnieuw koppelen als onderdeel van de OnActivateAsync() methode. Hiervoor kan het volgende worden aangeroepen:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

De consument gebruikt de vorige ingang die deze heeft gekregen toen hij zich voor het eerst abonneert op 'cv-verwerking'. U ziet dat ResumeAsync alleen een bestaand abonnement wordt bijgewerkt met het nieuwe exemplaar van IAsyncObserver logica en niet verandert dat deze consument al is geabonneerd op deze stream.

Hoe wordt de consument oud subscriptionHandle? Er zijn twee opties. De consument heeft mogelijk de ingang behouden die is teruggegeven van de oorspronkelijke SubscribeAsync bewerking en kan deze nu gebruiken. Als de consument niet over de ingang beschikt, kan deze ook vragen om IAsyncStream<T> alle actieve abonnementshandgrepen door het volgende aan te roepen:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

De consument kan deze nu allemaal hervatten of zich afmelden voor sommigen, indien gewenst.

Tip

Als de consumentkorrel de IAsyncObserver<T> interface rechtstreeks implementeert (public class MyGrain<T> : Grain, IAsyncObserver<T>), moet het in theorie niet verplicht zijn om de IAsyncObserver interface opnieuw te koppelen en hoeft deze dus niet aan te roepen ResumeAsync. De streamingruntime moet automatisch kunnen achterhalen dat het graan al wordt geïmplementeerd en deze methoden alleen aanroept IAsyncObserver IAsyncObserver . De streaming-runtime biedt momenteel echter geen ondersteuning voor dit en de graancode moet nog steeds expliciet worden aangeroepen ResumeAsync, zelfs als de graanbewerking rechtstreeks wordt geïmplementeerd IAsyncObserver .

Expliciete en impliciete abonnementen

Een streamgebruiker moet zich standaard expliciet abonneren op de stream. Dit abonnement wordt meestal geactiveerd door een extern bericht dat het graan (of de client) ontvangt waarmee het abonnement moet worden geabonneerd. In een chatservice bijvoorbeeld wanneer een gebruiker deelneemt aan een chatruimte, ontvangt zijn graan een JoinChatGroup bericht met de naam van de chat, waardoor de gebruiker zich abonneert op deze chatstream.

Daarnaast Orleans bieden streams ook ondersteuning voor impliciete abonnementen. In dit model wordt het graan niet expliciet geabonneerd op de stream. Dit graan wordt automatisch, impliciet, op basis van de korrelidentiteit en een ImplicitStreamSubscriptionAttribute. De belangrijkste waarde van impliciete abonnementen zorgt ervoor dat de streamactiviteit automatisch de korrelactivering activeert (waardoor het abonnement wordt geactiveerd). Als bijvoorbeeld sms-streams zouden worden gebruikt om een stroom en een ander graanproces te produceren, moet de producent de identiteit van het verbruiksinterval kennen en er een graanoproep aan doen om het te laten abonneren op de stroom. Pas daarna kan het beginnen met het verzenden van gebeurtenissen. In plaats daarvan kan de producent met behulp van impliciete abonnementen gewoon beginnen met het produceren van gebeurtenissen naar een stream en wordt het verbruiksinterval automatisch geactiveerd en geabonneerd op de stream. In dat geval maakt de producent niet uit wie de gebeurtenissen leest

De grain-implementatie MyGrainType kan een kenmerk [ImplicitStreamSubscription("MyStreamNamespace")]declareren. Dit vertelt de streamingruntime dat wanneer een gebeurtenis wordt gegenereerd op een stream waarvan de identiteit GUID XXX en "MyStreamNamespace" naamruimte is, deze moet worden geleverd aan de korrel waarvan de identiteit XXX van het type MyGrainTypeis. Dat wil gezegd, de runtime wijst de stream <XXX, MyStreamNamespace> toe aan verbruiksinterval <XXX, MyGrainType>.

De aanwezigheid van ImplicitStreamSubscriptionzorgt ervoor dat de streamingruntime dit graan automatisch abonneert op een stream en de stream-gebeurtenissen hieraan levert. De graancode moet echter nog steeds de runtime vertellen hoe gebeurtenissen moeten worden verwerkt. In wezen moet het de IAsyncObserver. Wanneer het graan wordt geactiveerd, moet de graancode binnen OnActivateAsync daarom het volgende aanroepen:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Abonnementslogica schrijven

Hieronder ziet u de richtlijnen voor het schrijven van de abonnementslogica voor verschillende gevallen: expliciete en impliciete abonnementen, terugspoelen en niet-terugspoelen van streams. Het belangrijkste verschil tussen expliciete en impliciete abonnementen is dat voor impliciete korrel altijd precies één impliciet abonnement voor elke streamnaamruimte is; er is geen manier om meerdere abonnementen te maken (er is geen multipliciteit voor abonnementen), er is geen manier om u af te melden en de graanlogica hoeft altijd alleen de verwerkingslogica te koppelen. Dat betekent ook dat voor impliciete abonnementen nooit een abonnement hoeft te worden hervat. Aan de andere kant, voor expliciete abonnementen, moet u het abonnement hervatten, anders, als het graan zich opnieuw abonneert, leidt dit ertoe dat het graan meerdere keren wordt geabonneerd.

Impliciete abonnementen:

Voor impliciete abonnementen moet het graan zich nog steeds abonneren om de verwerkingslogica te koppelen. Dit kan in het verbruiksinterval worden gedaan door de IStreamSubscriptionObserver en IAsyncObserver<T> interfaces te implementeren, zodat het graan afzonderlijk van abonneren kan worden geactiveerd. Als u zich wilt abonneren op de stroom, maakt het graan een ingang en aanroepen await handle.ResumeAsync(this) in de OnSubscribed(...) bijbehorende methode.

Voor het verwerken van berichten wordt de methode geïmplementeerd voor het IAsyncObserver<T>.OnNextAsync(...) ontvangen van streamgegevens en een reekstoken. De methode kan ook ResumeAsync een set gemachtigden bevatten die de methoden van de IAsyncObserver<T> interface, onNextAsync, en onErrorAsynconCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Expliciete abonnementen:

Voor expliciete abonnementen moet een grain aanroepen SubscribeAsync om u te abonneren op de stream. Hiermee maakt u een abonnement en voegt u de verwerkingslogica toe. Het expliciete abonnement bestaat totdat het graan wordt afgemeld, dus als een grain wordt gedeactiveerd en opnieuw wordt geactiveerd, wordt het graan nog steeds expliciet geabonneerd, maar wordt er geen verwerkingslogica gekoppeld. In dit geval moet het graan de verwerkingslogica opnieuw koppelen. Om dat te doen, OnActivateAsyncmoet het graan eerst uitzoeken welke abonnementen het heeft, door aan te roepen IAsyncStream<T>.GetAllSubscriptionHandles(). Het graan moet worden uitgevoerd ResumeAsync op elke ingang die het wil blijven verwerken of UnsubscribeAsync op alle ingangen waarmee deze wordt uitgevoerd. Het korreltje kan eventueel ook het StreamSequenceToken argument opgeven voor de ResumeAsync aanroepen, waardoor dit expliciete abonnement vanaf dat token begint te gebruiken.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    } 
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Volgorde- en volgordetokens streamen

De volgorde van de levering van gebeurtenissen tussen een individuele producent en een individuele consument is afhankelijk van de streamprovider.

Met SMS bepaalt de producent expliciet de volgorde van gebeurtenissen die door de consument worden gezien door de manier waarop de producent deze publiceert. Standaard (als de optie voor sms-provider SimpleMessageStreamProviderOptions.FireAndForgetDelivery is ingesteld op false) en als de producent elke oproep wacht OnNextAsync , komen de gebeurtenissen binnen in FIFO-volgorde. In sms is het aan de producent om te bepalen hoe leveringsfouten moeten worden afgehandeld die worden aangegeven door een verbroken Task door de OnNextAsync oproep.

Azure Queue-streams garanderen geen FIFO-volgorde, omdat de onderliggende Azure-wachtrijen de volgorde in foutgevallen niet garanderen. (Ze garanderen wel FIFO-volgorde in mislukte uitvoeringen.) Wanneer een producent de gebeurtenis in Azure Queue produceert en de wachtrijbewerking mislukt, is het aan de producent om een andere wachtrij te proberen en later om te gaan met mogelijke dubbele berichten. Aan de leveringszijde verwijdert de Orleans Streaming-runtime de gebeurtenis uit de wachtrij en probeert deze te leveren voor verwerking aan consumenten. De Orleans streamingruntime verwijdert de gebeurtenis alleen uit de wachtrij na een geslaagde verwerking. Als de bezorging of verwerking mislukt, wordt de gebeurtenis niet uit de wachtrij verwijderd en wordt deze later automatisch opnieuw weergegeven in de wachtrij. De streamingruntime probeert het opnieuw te leveren, waardoor de FIFO-bestelling mogelijk wordt onderbroken. Het bovenstaande gedrag komt overeen met de normale semantiek van Azure Queues.

Door toepassing gedefinieerde volgorde: om de bovenstaande bestelproblemen af te handelen, kan een toepassing desgewenst de volgorde opgeven. Dit wordt bereikt via een StreamSequenceToken, een ondoorzichtig IComparable object dat kan worden gebruikt om gebeurtenissen te orden. Een producent kan een optionele StreamSequenceToken aanroep OnNext doorgeven. Dit StreamSequenceToken wordt doorgegeven aan de consument en wordt samen met de gebeurtenis geleverd. Op die manier kan een toepassing de volgorde ervan onafhankelijk van de streamingruntime redeneren en reconstrueren.

Terugspoelen van stromen

Sommige streams staan alleen toe dat een toepassing zich abonneert op deze stromen, te beginnen op het laatste tijdstip, terwijl andere streams 'terug in de tijd' toestaan. De laatste mogelijkheid is afhankelijk van de onderliggende wachtrijtechnologie en de specifieke streamprovider. Azure Queues staan bijvoorbeeld alleen het gebruik van de meest recente enqueued gebeurtenissen toe, terwijl EventHub het afspelen van gebeurtenissen vanaf een willekeurig tijdstip toestaat (tot een bepaalde verlooptijd). Streams die teruggaan in de tijd ondersteunen, worden herstelbare streams genoemd.

De consument van een terugspoelende stream kan een StreamSequenceToken aanroep SubscribeAsync doorgeven. De runtime levert gebeurtenissen vanaf die StreamSequenceToken. Een null-token betekent dat de consument gebeurtenissen wil ontvangen die beginnen met de nieuwste versie.

De mogelijkheid om een stream terug te spoelen is erg handig in herstelscenario's. Denk bijvoorbeeld aan een korrel die zich abonneert op een stream en periodiek de status ervan controleert samen met het meest recente reekstoken. Bij het herstellen van een fout kan het graan zich opnieuw abonneren op dezelfde stroom vanaf het meest recente controlepunttoken, waardoor er geen gebeurtenissen verloren gaan die zijn gegenereerd sinds het laatste controlepunt.

De Event Hubs-provider kan worden terugspoelen. U vindt de bijbehorende code op GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. SMS- en Azure Queue-providers kunnen niet worden terugspoelen.

Stateless automatisch uitgeschaalde verwerking

Orleans Streaming is standaard bedoeld ter ondersteuning van een groot aantal relatief kleine streams, elk verwerkt door een of meer stateful korrels. Gezamenlijk wordt de verwerking van alle stromen samen geshard tussen een groot aantal reguliere (stateful) korrels. De toepassingscode bepaalt deze sharding door stream-id's en graan-id's toe te wijzen en expliciet te abonneren. Het doel is sharded stateful verwerking.

Er is echter ook een interessant scenario van automatisch uitgeschaalde staatloze verwerking. In dit scenario heeft een toepassing een klein aantal streams (of zelfs één grote stroom) en het doel is staatloze verwerking. Een voorbeeld is een globale stroom gebeurtenissen, waarbij de verwerking betrekking heeft op het decoderen van elke gebeurtenis en het mogelijk doorsturen naar andere streams voor verdere stateful verwerking. De stateless uitgeschaalde stroomverwerking kan worden ondersteund Orleans via StatelessWorkerAttribute korrels.

Huidige status van stateless automatisch uitgeschaalde verwerking: dit is nog niet geïmplementeerd. Een poging om zich vanaf een korrel op een StatelessWorker stream te abonneren, leidt tot niet-gedefinieerd gedrag. We overwegen deze optie te ondersteunen.

Korrels en Orleans clients

Orleans stromen werken gelijkmatig over korrels en Orleans clients. Dat wil gezegd, dezelfde API's kunnen binnen een graan en in een Orleans client worden gebruikt om gebeurtenissen te produceren en te gebruiken. Dit vereenvoudigt de toepassingslogica aanzienlijk, waardoor speciale API's aan de clientzijde, zoals Grain Observers, redundant worden.

Volledig beheerde en betrouwbare streaming pub-sub

Voor het bijhouden van streamabonnementen gebruikt Orleans u een runtimeonderdeel met de naam Streaming Pub-Sub , dat fungeert als een rendezvous-punt voor streamgebruikers en streamproducenten. Pub-sub houdt alle streamabonnementen bij en houdt deze vast en komt overeen met streamgebruikers met streamproducenten.

Toepassingen kunnen kiezen waar en hoe de Pub-Sub-gegevens worden opgeslagen. Het Pub-Sub-onderdeel zelf wordt geïmplementeerd als korrels (genaamd PubSubRendezvousGrain), die declaratieve persistentie gebruiken Orleans . PubSubRendezvousGrain maakt gebruik van de opslagprovider met de naam PubSubStore. Net als bij elke korrel kunt u een implementatie voor een opslagprovider aanwijzen. Voor Streaming Pub-Sub kunt u de implementatie van de tijdens de PubSubStore siloconstructie wijzigen met behulp van de silohostbouwer:

Met het volgende configureert u pub-sub om de status ervan op te slaan in Azure-tabellen.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

Op die manier worden pub-subgegevens duurzaam opgeslagen in Azure Table. Voor de eerste ontwikkeling kunt u ook geheugenopslag gebruiken. Naast Pub-Sub levert de Orleans Streaming Runtime gebeurtenissen van producenten aan consumenten, beheert alle runtimeresources die zijn toegewezen aan actief gebruikte streams en verzamelt transparant garbage runtime-resources van ongebruikte streams.

Configuratie

Als u streams wilt gebruiken, moet u streamproviders inschakelen via de silohost of clusterclientbouwer. Hier vindt u meer informatie over streamproviders. Voorbeeld van installatie van streamprovider:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Zie ook

Orleans Stream-providers