Orleans akış API'leri
Uygulamalar, .NET'teki iyi bilinen Reaktif Uzantılara (Rx) çok benzeyen API'ler aracılığıyla akışlarla etkileşim kurar. Temel fark, Orleans ' dağıtılmış ve ölçeklenebilir işlem dokusunda Orleansişlemeyi daha verimli hale getirmek için akış uzantılarının zaman uyumsuz olmasıdır.
Zaman uyumsuz akış
Bir uygulama, akış sağlayıcısını kullanarak bir akış tutamacını almak için başlar. Burada akış sağlayıcıları hakkında daha fazla bilgi edinebilirsiniz, ancak şimdilik bunu uygulayıcıların akış davranışını ve semantiğini özelleştirmesine olanak tanıyan bir akış fabrikası olarak düşünebilirsiniz:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Bir uygulama, bir dilimin içinde yöntemini çağırarak Grain.GetStreamProvider veya istemcideyken yöntemini çağırarak akış sağlayıcısına GrainClient.GetStreamProvider başvuru alabilir.
Orleans.Streams.IAsyncStream<T> bir sanal akış için mantıksal, kesin olarak belirlenmiş bir tanıtıcıdır. Ruhen Grain Reference'a Orleans benzer. GetStreamProvider
ve GetStream
çağrıları tamamen yereldir. için GetStream
bağımsız değişkenler, bir GUID ve akış ad alanı (null olabilir) olarak adlandırdığımız ek bir dizedir. GUID ve ad alanı dizesi birlikte akış kimliğini (bağımsız değişkenlerine IGrainFactory.GetGrainbenzer şekilde) oluşturur. GUID ve ad alanı dizesinin birleşimi, akış kimliklerini belirlemede ek esneklik sağlar. Grain 7'nin Grain türünde PlayerGrain
ve farklı bir tanecik 7'nin tanecik türü ChatRoomGrain
içinde mevcut olabileceği gibi, Akış 123 akış ad alanıyla ve farklı bir akış 123 de akış ad PlayerEventsStream
alanında ChatRoomMessagesStream
bulunabilir.
Üretim ve tüketim
IAsyncStream<T> hem hem de IAsyncObserver<T> IAsyncObservable<T> arabirimlerini uygular. Bu şekilde, bir uygulama kullanarak akışta yeni olaylar üretmek veya kullanarak Orleans.Streams.IAsyncObserver<T>
bir akışa abone olmak ve akıştan olayları kullanmak için akışı Orleans.Streams.IAsyncObservable<T>
kullanabilir.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Akışta olaylar üretmek için bir uygulama yalnızca
await stream.OnNextAsync<T>(event)
Bir akışa abone olmak için bir uygulama şunu çağırır:
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
için bağımsız değişkeni SubscribeAsync , arabirimini uygulayan IAsyncObserver<T> bir nesne veya gelen olayları işlemek için lambda işlevlerinin bir bileşimi olabilir. için SubscribeAsync
daha fazla seçenek sınıfı aracılığıyla AsyncObservableExtensions kullanılabilir. SubscribeAsync
akış aboneliğini kaldırmak için kullanılabilecek opak bir tanıtıcı olan bir StreamSubscriptionHandle<T>döndürür (ruhen zaman uyumsuz sürümüne benzer).IDisposable
await subscriptionHandle.UnsubscribeAsync()
Aboneliğin etkinleştirme için değil, bir dilim için olduğunu unutmayın. Tahıl kodu akışa abone olduktan sonra, bu abonelik bu etkinleştirmenin ömrünü aşıyor ve tahıl kodu (potansiyel olarak farklı bir etkinleştirmede) açıkça aboneliği kaldırana kadar sonsuza kadar dayanıklı kalıyor. Bu, sanal akış soyutlamasının kalbidir: yalnızca tüm akışlar mantıksal olarak mevcut değildir, aynı zamanda bir akış aboneliği de dayanıklıdır ve aboneliği oluşturan belirli bir fiziksel etkinleştirmenin ötesindedir.
Çokluk
Bir Orleans akışın birden çok üreticisi ve birden çok tüketicisi olabilir. Bir üretici tarafından yayımlanan bir ileti, ileti yayımlanmadan önce akışa abone olan tüm tüketicilere teslim edilir.
Buna ek olarak, tüketici aynı akışa birden çok kez abone olabilir. Her abone olduğunda benzersiz StreamSubscriptionHandle<T>bir alır. Bir taneli (veya istemci) aynı akışa X kez abone olursa, her abonelik için bir kez olmak üzere X kez aynı olayı alır. Tüketici ayrıca bireysel abonelik aboneliğinden de çıkabilir. Şu çağrıyı yaparak tüm geçerli aboneliklerini bulabilir:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Hatalardan kurtarma
Bir akışın üreticisi ölürse (veya tahılı devre dışı bırakılırsa), yapması gereken bir şey yoktur. Bu tahıl daha fazla olay üretmek istediğinde akış tutamacını yeniden alabilir ve aynı şekilde yeni olaylar üretebilir.
Tüketici mantığı biraz daha karmaşıktır. Daha önce de belirttiğimiz gibi, bir tüketici dilimi bir akışa abone olduktan sonra, bu abonelik açıkça abonelikten kaldırılana kadar geçerlidir. Akışın tüketicisi ölürse (veya tahılı devre dışı bırakılırsa) ve akışta yeni bir olay oluşturulursa, tüketici dilimi otomatik olarak yeniden etkinleştirilir (bir ileti gönderildiğinde her normal Orleans tanecik gibi otomatik olarak etkinleştirilir). Tahıl kodunun şu anda yapması gereken tek şey, verileri işlemek için bir IAsyncObserver<T> sağlamaktır. Tüketicinin yöntemin bir parçası olarak işleme mantığını yeniden eklemesi OnActivateAsync() gerekir. Bunu yapmak için şu çağrıyı yapabilir:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Tüketici, "işlemeye devam" için ilk abone olduğunda aldığı önceki tanıtıcıyı kullanır. Mevcut bir aboneliği yalnızca yeni mantık örneğiyle güncelleştirdiğini ResumeAsync ve bu tüketicinin IAsyncObserver
bu akışa zaten abone olduğu gerçeğini değiştirmediğini unutmayın.
Tüketici nasıl eskir subscriptionHandle
? 2 seçenek vardır. Tüketici, özgün SubscribeAsync
işlemden geri verilen tanıtıcıyı kalıcı hale getirmiş olabilir ve şimdi kullanabilir. Alternatif olarak, tüketici tanıtıcıya sahip değilse şu çağrıyı IAsyncStream<T>
yaparak tüm etkin abonelik tanıtıcılarını isteyebilir:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Tüketici artık tüm bunları sürdürebilir veya isterse bazılarından aboneliğini kaldırabilir.
İpucu
Tüketici hub'ı arabirimini IAsyncObserver<T> doğrudan uygularsa (public class MyGrain<T> : Grain, IAsyncObserver<T>
), teoride yeniden eklemek IAsyncObserver
gerekmemelidir ve bu nedenle çağrısı ResumeAsync
gerekmez. Akış çalışma zamanı, tahılın zaten uygulandığını IAsyncObserver
ve yalnızca bu IAsyncObserver
yöntemleri çağıracağını otomatik olarak anlayabilecektir. Ancak, akış çalışma zamanı şu anda bunu desteklememektedir ve tanecik doğrudan uygulansa IAsyncObserver
bile tahıl kodunun açıkça çağrısı ResumeAsync
gerekir.
Açık ve örtük abonelikler
Varsayılan olarak, bir akış tüketicisinin akışa açıkça abone olması gerekir. Bu abonelik genellikle, tahılın (veya istemcinin) aldığı ve abone olmasını bildiren bir dış ileti tarafından tetiklenebilir. Örneğin, bir sohbet hizmetinde bir kullanıcı sohbet odasına katıldığında tahılı sohbet adını içeren bir JoinChatGroup
ileti alır ve bu da kullanıcı diliminin bu sohbet akışına abone olmasına neden olur.
Ayrıca, Orleans akışlar örtük abonelikleri de destekler. Bu modelde, tahıl açıkça akışa abone olmaz. Bu tanecik, yalnızca tanecik kimliğine ve bir ImplicitStreamSubscriptionAttributeöğesine göre örtük olarak otomatik olarak abone olur. Örtük aboneliklerin ana değeri, akış etkinliğinin tahıl etkinleştirmesini (dolayısıyla aboneliği tetikleme) otomatik olarak tetiklesine izin vermektir. Örneğin SMS akışlarını kullanarak, bir tane bir akış üretmek isterse ve başka bir tanecik bu akışı işlemek isterse, üreticinin tüketici diliminin kimliğini bilmesi ve akışa abone olmasını söyleyen bir taneli çağrı yapması gerekir. Ancak bundan sonra olayları göndermeye başlayabilir. Bunun yerine, örtük abonelikleri kullanarak üretici bir akışa olay üretmeye başlayabilir ve tüketici dilimi otomatik olarak etkinleştirilip akışa abone olur. Bu durumda, yapımcı olayları kimin okuduğuyla hiç ilgilenmez
Grain uygulaması MyGrainType
bir özniteliğini [ImplicitStreamSubscription("MyStreamNamespace")]
bildirebilir. Bu, akış çalışma zamanına kimliği GUID XXX ve "MyStreamNamespace"
ad alanı olan bir akışta bir olay oluşturulduğunda, türü XXX MyGrainType
olan taneciye teslim edilmesi gerektiğini bildirir. Başka bir ifadeyle, çalışma zamanı akışı <XXX, MyStreamNamespace>
tüketici dilimine eşler <XXX, MyGrainType>
.
varlığı ImplicitStreamSubscription
, akış çalışma zamanının bu dilimi bir akışa otomatik olarak abone olmasına ve akış olaylarını ona teslim etmesine neden olur. Ancak, tahıl kodunun yine de çalışma zamanına olayların nasıl işlenmesini istediğini bildirmesi gerekir. Temelde, öğesini eklemesi IAsyncObserver
gerekir. Bu nedenle, tahıl etkinleştirildiğinde, içindeki OnActivateAsync
tahıl kodunun şunları çağırması gerekir:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Abonelik mantığı yazma
Aşağıda çeşitli durumlar için abonelik mantığının nasıl yazılabileceğine ilişkin yönergeler verilmiştir: açık ve örtük abonelikler, geri alınabilen ve geri alınamayan akışlar. Açık ve örtük abonelikler arasındaki temel fark, örtük olarak her akış ad alanı için her zaman tam olarak bir örtük aboneliğe sahip olmasıdır; birden çok abonelik oluşturmanın bir yolu yoktur (abonelik çokluğu yoktur), aboneliği kaldırmanın bir yolu yoktur ve tahıl mantığının her zaman yalnızca işleme mantığını eklemesi gerekir. Bu aynı zamanda örtük abonelikler için hiçbir zaman bir aboneliği sürdürmeye gerek olmadığı anlamına gelir. Öte yandan, açık abonelikler için aboneliğin sürdürülmesi gerekir, aksi takdirde tahıl yeniden abone olursa tanenin birden çok kez abone olmasına neden olur.
Örtük abonelikler:
Örtük abonelikler için, işleme mantığını eklemek için tahılın yine de abone olması gerekir. Bu, ve IAsyncObserver<T>
arabirimleri uygulanarak IStreamSubscriptionObserver
ve arabirimleri uygulanarak, tahılın aboneden ayrı olarak etkinleştirilmesini sağlayarak tüketici diliminde yapılabilir. Akışa abone olmak için, tahıl bir tanıtıcı oluşturur ve yöntemini çağırır await handle.ResumeAsync(this)
OnSubscribed(...)
.
İletileri işlemek için, IAsyncObserver<T>.OnNextAsync(...)
akış verilerini ve bir sıra belirtecini almak için yöntemi uygulanır. Alternatif olarak, ResumeAsync
yöntemi , ve onCompletedAsync
arabiriminin IAsyncObserver<T>
onNextAsync
onErrorAsync
yöntemlerini temsil eden bir temsilci kümesi alabilir.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Açık abonelikler:
Açık abonelikler için, akışa abone olmak için bir dilimin çağrısı SubscribeAsync
yapması gerekir. Bu, bir abonelik oluşturur ve işleme mantığını ekler. Açık abonelik, dilim aboneliği kaldırılana kadar mevcut olur, bu nedenle bir tane devre dışı bırakılır ve yeniden etkinleştirilirse, tahıl yine de açıkça abone olur, ancak hiçbir işleme mantığı eklenmez. Bu durumda, tahılın işleme mantığını yeniden eklemesi gerekir. Bunu yapmak için, öğesinin içinde OnActivateAsync
ilk olarak öğesini çağırarak IAsyncStream<T>.GetAllSubscriptionHandles()hangi aboneliklere sahip olduğunu öğrenmesi gerekir. Tahıl işlemeye devam etmek istediği her tanıtıcıda yürütülmelidir ResumeAsync
veya işlenmek üzere olduğu tüm tanıtıcılarda UnsubscribeAsync. Tahıl isteğe bağlı olarak öğesini çağrılara ResumeAsync
bağımsız değişken olarak belirtebilir StreamSequenceToken
ve bu da bu açık aboneliğin bu belirteçten kullanmaya başlamasına neden olur.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Akış sırası ve sıralı belirteçler
Tek bir üretici ile bireysel bir tüketici arasındaki olay tesliminin sırası akış sağlayıcısına bağlıdır.
SMS ile üretici, tüketici tarafından görülen olayların sırasını üreticinin yayımlama şeklini denetleyerek açıkça denetler. Varsayılan olarak (SMS sağlayıcısı seçeneği SimpleMessageStreamProviderOptions.FireAndForgetDelivery false olarak ayarlandıysa) ve üretici her OnNextAsync
çağrıyı bekliyorsa, olaylar FIFO sırasına göre gelir. SMS'de üreticinin, çağrı tarafından döndürülen bozuk Task
bir hatayla belirtilecek teslim hatalarını nasıl işleyeceklerine karar vermeleri OnNextAsync
gerekir.
Temel alınan Azure Kuyrukları hata durumlarında sırayı garanti etmediğinden Azure Kuyruk akışları FIFO sırasını garanti etmemektedir. (Hatasız yürütmelerde FIFO sırasını garanti eder.) Bir üretici olayı Azure Kuyruk'a ürettiğinde, kuyruk işlemi başarısız olursa başka bir kuyruk denemesi ve daha sonra olası yinelenen iletilerle ilgilenmek üreticiye bağlıdır. Teslim tarafında Akış Orleans çalışma zamanı, olayı kuyruktan kaldırır ve tüketicilere işlenmek üzere teslim etmeye çalışır. Orleans Streaming çalışma zamanı, yalnızca başarılı bir işlemden sonra olayı kuyruktan siler. Teslim veya işleme başarısız olursa olay kuyruktan silinmez ve daha sonra kuyrukta otomatik olarak yeniden görünür. Streaming çalışma zamanı yeniden teslim etmeyi deneyecek ve bu nedenle FIFO sırasını bozabilecek. Yukarıdaki davranış, Azure Kuyruklarının normal semantiğiyle eşleşir.
Uygulama Tanımlı Sipariş: Yukarıdaki sıralama sorunlarıyla başa çıkmak için, bir uygulama isteğe bağlı olarak sıralamasını belirtebilir. Bu, olayları sıralamak için kullanılabilecek opak IComparable bir nesne olan aracılığıyla elde StreamSequenceTokenedilir. Bir üretici isteğe bağlı StreamSequenceToken
bir çağrıyı OnNext
geçirebilir. Bu StreamSequenceToken
, tüketiciye geçirilecek ve olayla birlikte teslim edilecek. Bu şekilde, bir uygulama akış çalışma zamanından bağımsız olarak sırasını gerekçelendirebilir ve yeniden oluşturabilir.
Geri sarılabilir akışlar
Bazı akışlar, bir uygulamanın yalnızca en son zaman noktasından başlayarak bunlara abone olmasını sağlarken, diğer akışlar "zamanda geriye gitme" olanağı sağlar. İkinci özellik, temel alınan kuyruğa alma teknolojisine ve belirli akış sağlayıcısına bağlıdır. Örneğin, Azure Kuyrukları yalnızca en son sıralanan olayların tüketilmesine izin verirken EventHub rastgele bir noktadan (süre sonu süresine kadar) olayların yeniden yürütülmesine izin verir. Zamanda geri dönmeyi destekleyen akışlara geri sarılabilir akışlar denir.
Geri sarılabilir bir akışın tüketicisi çağrısına SubscribeAsync
bir StreamSequenceToken
geçirebilir. Çalışma zamanı, bu StreamSequenceToken
öğesinden başlayarak olaylarını ona teslim eder. Null belirteç, tüketicinin en son sürümden başlayarak olayları almak istediği anlamına gelir.
Akışı geri sarma özelliği kurtarma senaryolarında çok yararlıdır. Örneğin, bir akışa abone olan ve durumunu en son sıra belirteci ile birlikte düzenli aralıklarla kontrol eden bir tanecik düşünün. Bir hatadan kurtarılırken, dilim aynı akışa en son denetim noktası oluşturulmuş sıra belirtecinden yeniden abone olabilir ve böylece son denetim noktasından bu yana oluşturulan olayları kaybetmeden kurtarılabilir.
Event Hubs sağlayıcısı geri sarılabilir. Kodunu GitHub'da bulabilirsiniz: Orleans/Azure/Orleans. Streaming.EventHubs. SMS ve Azure Kuyruk sağlayıcıları geri alınamaz.
Durum bilgisi olmayan otomatik olarak ölçeklendirilen işleme
Varsayılan olarak Akış, Orleans her biri bir veya daha fazla durum bilgisi olan tanecik tarafından işlenen çok sayıda nispeten küçük akışı destekleyecek şekilde hedeflenmiştir. Toplu olarak, tüm akışların birlikte işlenmesi çok sayıda normal (durum bilgisi olan) tane arasında parçalanır. Uygulama kodu, akış kimlikleri ve tanecik kimlikleri atayarak ve açıkça abone olarak bu parçalanmayı denetler. Amaç, durum bilgisi olan parçalı işlemedir.
Ancak, otomatik olarak ölçeklendirilen durum bilgisi olmayan işlemenin ilginç bir senaryosu da vardır. Bu senaryoda, bir uygulamanın az sayıda akışı (hatta bir büyük akışı) vardır ve hedef durum bilgisi olmayan işlemedir. Buna örnek olarak, işlemenin her olayın kodunu çözmeyi ve durum bilgisi olan daha fazla işlem için diğer akışlara iletmeyi kapsadığı genel bir olay akışı örnek olarak verilmiştir. Durum bilgisi olmayan ölçeklendirilen akış işleme, dilimler aracılığıyla StatelessWorkerAttribute içinde Orleans desteklenebilir.
Durum Bilgisi Olmayan Otomatik Olarak Ölçeği Genişletilen İşlemenin Geçerli Durumu: Bu henüz uygulanmadı. Bir dilimden StatelessWorker
bir akışa abone olma girişimi tanımsız davranışa neden olur. Bu seçeneği desteklemeyi düşünüyoruz.
Tahıllar ve Orleans istemciler
Orleans Akışlar, tahıllar ve Orleans istemciler arasında düzgün çalışır. Başka bir ifadeyle, aynı API'ler bir tanecik içinde ve istemcide Orleans olayları üretmek ve kullanmak için kullanılabilir. Bu, uygulama mantığını büyük ölçüde basitleştirerek Grain Observers gibi özel istemci tarafı API'lerinin yedekli olmasını sağlar.
Tam olarak yönetilen ve güvenilir akış pub-sub
Akış aboneliklerini izlemek için, Orleans akış tüketicileri ve akış üreticileri için bir buluşma noktası işlevi görecek Streaming Pub-Sub adlı bir çalışma zamanı bileşeni kullanır. Pub-sub tüm akış aboneliklerini izler ve kalıcı hale getirerek akış tüketicilerini akış üreticileriyle eşleştirir.
Uygulamalar Pub-Sub verilerinin nerede ve nasıl depolandığını seçebilir. Pub-Sub bileşeninin kendisi, bildirim temelli kalıcılık kullanan Orleans tanecikler (olarak adlandırılırPubSubRendezvousGrain
) olarak uygulanır. PubSubRendezvousGrain
adlı PubSubStore
depolama sağlayıcısını kullanır. Her dilimde olduğu gibi, bir depolama sağlayıcısı için bir uygulama belirleyebilirsiniz. Streaming Pub-Sub için, silo konak oluşturucusunu kullanarak silo yapım zamanında uygulamasını PubSubStore
değiştirebilirsiniz:
Aşağıda Pub-Sub, durumunu Azure tablolarında depoacak şekilde yapılandırılır.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Bu şekilde Pub-Sub verileri Azure Tablosu'nda durabilir bir şekilde depolanır. İlk geliştirme için bellek depolamayı da kullanabilirsiniz. Pub-Sub'a ek olarak Streaming Runtime, Orleans üreticilerden tüketicilere olaylar sunar, etkin olarak kullanılan akışlara ayrılan tüm çalışma zamanı kaynaklarını yönetir ve kullanılmayan akışlardan çalışma zamanı kaynaklarını saydam bir şekilde çöp toplar.
Yapılandırma
Akışları kullanmak için silo konağı veya küme istemci oluşturucuları aracılığıyla akış sağlayıcılarını etkinleştirmeniz gerekir. Akış sağlayıcıları hakkında daha fazla bilgiyi buradan okuyabilirsiniz. Örnek akış sağlayıcısı kurulumu:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");