Orleans API streaming
Aplikasi berinteraksi dengan aliran melalui API yang sangat mirip dengan Reactive Extensions (Rx) terkenal di .NET. Perbedaan utamanya adalah bahwa Orleans ekstensi streaming bersifat asinkron, untuk membuat pemrosesan lebih efisien dalam Orleans' kain komputasi terdistribusi dan dapat diskalakan.
Aliran asinkron
Aplikasi dimulai dengan menggunakan penyedia streaming untuk mendapatkan handel ke aliran. Anda dapat membaca lebih lanjut tentang penyedia streaming di sini, tetapi untuk saat ini, Anda dapat menganggapnya sebagai pabrik aliran yang memungkinkan pelaksana untuk menyesuaikan perilaku aliran dan semantik:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Aplikasi bisa mendapatkan referensi ke penyedia streaming baik dengan memanggil Grain.GetStreamProvider metode ketika berada di dalam butir, atau dengan memanggil GrainClient.GetStreamProvider metode ketika berada di klien.
Orleans.Streams.IAsyncStream<T> adalah handel logis yang diketik dengan kuat ke aliran virtual. Ini mirip dengan Referensi Orleans Biji-Bijian. Panggilan ke GetStreamProvider
dan GetStream
murni lokal. Argumen untuk GetStream
adalah GUID dan string tambahan yang kami sebut namespace aliran (yang bisa null). Bersama-sama GUID dan string namespace terdiri dari identitas aliran (mirip dengan argumen ke IGrainFactory.GetGrain). Kombinasi STRING GUID dan namespace memberikan fleksibilitas ekstra dalam menentukan identitas aliran. Sama seperti butir 7 mungkin ada dalam jenis PlayerGrain
Biji-bijian dan butir yang berbeda 7 mungkin ada dalam jenis ChatRoomGrain
biji-bijian , Stream 123 mungkin ada dengan namespace PlayerEventsStream
streaming dan aliran yang berbeda 123 mungkin ada dalam namespace ChatRoomMessagesStream
streaming .
Memproduksi dan mengonsumsi
IAsyncStream<T>IAsyncObserver<T> mengimplementasikan antarmuka dan IAsyncObservable<T> . Dengan begitu aplikasi dapat menggunakan aliran baik untuk menghasilkan peristiwa baru ke dalam aliran dengan menggunakan Orleans.Streams.IAsyncObserver<T>
atau untuk berlangganan dan mengonsumsi peristiwa dari aliran dengan menggunakan Orleans.Streams.IAsyncObservable<T>
.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Untuk menghasilkan peristiwa ke dalam aliran, aplikasi hanya memanggil
await stream.OnNextAsync<T>(event)
Untuk berlangganan streaming, aplikasi memanggil
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
Argumen untuk SubscribeAsync dapat menjadi objek yang mengimplementasikan IAsyncObserver<T> antarmuka atau kombinasi fungsi lambda untuk memproses peristiwa masuk. Opsi lainnya untuk SubscribeAsync
tersedia melalui AsyncObservableExtensions kelas. SubscribeAsync
StreamSubscriptionHandle<T>mengembalikan , yang merupakan handel buram yang dapat digunakan untuk berhenti berlangganan dari aliran (mirip dengan versi IDisposableasinkron ).
await subscriptionHandle.UnsubscribeAsync()
Penting untuk dicatat bahwa langganan adalah untuk biji-bijian, bukan untuk aktivasi. Setelah kode biji-bijian berlangganan aliran, langganan ini melampaui kehidupan aktivasi ini dan tetap tahan lama selamanya, sampai kode biji-bijian (berpotensi dalam aktivasi yang berbeda) secara eksplisit berhenti berlangganan. Ini adalah inti dari abstraksi aliran virtual: tidak hanya semua aliran selalu ada, logis, tetapi juga langganan aliran tahan lama dan hidup di luar aktivasi fisik tertentu yang membuat langganan.
Beberapa
Aliran Orleans mungkin memiliki beberapa produsen dan beberapa konsumen. Pesan yang diterbitkan oleh produsen akan dikirimkan ke semua konsumen yang berlangganan streaming sebelum pesan diterbitkan.
Selain itu, konsumen dapat berlangganan aliran yang sama beberapa kali. Setiap kali berlangganan, ia mendapatkan kembali yang unik StreamSubscriptionHandle<T>. Jika butir (atau klien) berlangganan X kali ke aliran yang sama, maka akan menerima peristiwa X kali yang sama, sekali untuk setiap langganan. Konsumen juga dapat berhenti berlangganan dari langganan individual. Ini dapat menemukan semua langganannya saat ini dengan memanggil:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Memulihkan dari kegagalan
Jika produsen aliran mati (atau biji-bijiannya dinonaktifkan), tidak ada yang perlu dilakukan. Lain kali biji-bijian ini ingin menghasilkan lebih banyak peristiwa yang bisa ditangani aliran lagi dan menghasilkan peristiwa baru dengan cara yang sama.
Logika konsumen sedikit lebih terlibat. Seperti yang kami katakan sebelumnya, setelah butir konsumen berlangganan streaming, langganan ini berlaku sampai grain secara eksplisit berhenti berlangganan. Jika konsumen aliran mati (atau biji-bijiannya dinonaktifkan) dan peristiwa baru dihasilkan pada aliran, butir konsumen akan secara otomatis diaktifkan kembali (sama seperti biji-bijian biasa Orleans yang diaktifkan secara otomatis ketika pesan dikirim ke dalamnya). Satu-satunya hal yang perlu dilakukan kode biji-bijian sekarang adalah menyediakan IAsyncObserver<T> untuk memproses data. Konsumen perlu melampirkan kembali logika pemrosesan sebagai bagian OnActivateAsync() dari metode . Untuk melakukannya, ia dapat memanggil:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Konsumen menggunakan handel sebelumnya yang didapatkannya ketika pertama kali berlangganan "lanjutkan pemrosesan". Perhatikan bahwa ResumeAsync hanya memperbarui langganan yang ada dengan instans IAsyncObserver
logika baru dan tidak mengubah fakta bahwa konsumen ini sudah berlangganan aliran ini.
Bagaimana konsumen menjadi tua subscriptionHandle
? Ada 2 opsi. Konsumen mungkin telah mempertahankan handel yang diberikan kembali dari operasi asli SubscribeAsync
dan dapat menggunakannya sekarang. Atau, jika konsumen tidak memiliki handel, konsumen dapat meminta IAsyncStream<T>
semua handel langganan aktifnya, dengan memanggil:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Konsumen sekarang dapat melanjutkan semuanya atau berhenti berlangganan dari beberapa jika ingin.
Tip
Jika butir konsumen mengimplementasikan IAsyncObserver<T> antarmuka secara langsung (public class MyGrain<T> : Grain, IAsyncObserver<T>
), secara teori seharusnya tidak diperlukan untuk melampirkan IAsyncObserver
kembali dan dengan demikian tidak perlu memanggil ResumeAsync
. Runtime streaming harus dapat secara otomatis mencari tahu bahwa biji-bijian IAsyncObserver
sudah diterapkan dan hanya akan memanggil metode tersebut IAsyncObserver
. Namun, runtime streaming saat ini tidak mendukung ini dan kode biji-bijian masih perlu secara eksplisit memanggil ResumeAsync
, bahkan jika biji-bijian IAsyncObserver
diterapkan secara langsung.
Langganan eksplisit dan implisit
Secara default, konsumen stream harus secara eksplisit berlangganan aliran. Langganan ini biasanya akan dipicu oleh beberapa pesan eksternal yang diterima grain (atau klien) yang menginstruksikannya untuk berlangganan. Misalnya, dalam layanan obrolan ketika pengguna bergabung dengan ruang obrolan biji-bijiannya menerima JoinChatGroup
pesan dengan nama obrolan, yang akan menyebabkan biji-bijian pengguna berlangganan aliran obrolan ini.
Selain itu, Orleans stream juga mendukung langganan implisit. Dalam model ini, biji-bijian tidak secara eksplisit berlangganan aliran. Butir ini berlangganan secara otomatis, implisit, hanya berdasarkan identitas biji-bijiannya ImplicitStreamSubscriptionAttributedan . Nilai utama langganan implisit memungkinkan aktivitas streaming memicu aktivasi biji-bijian (karenanya memicu langganan) secara otomatis. Misalnya, menggunakan aliran SMS, jika satu butir ingin menghasilkan aliran dan biji-bijian lain memproses aliran ini, produsen perlu mengetahui identitas butir konsumen dan melakukan panggilan biji-bijian kepadanya yang memberi tahunya untuk berlangganan aliran. Hanya setelah itu dapat mulai mengirim peristiwa. Sebagai gantinya, menggunakan langganan implisit, produsen hanya dapat mulai memproduksi peristiwa ke aliran, dan butir konsumen akan secara otomatis diaktifkan dan berlangganan aliran. Dalam hal ini, produser tidak peduli sama sekali siapa yang membaca acara
Implementasi MyGrainType
biji-bijian dapat mendeklarasikan atribut [ImplicitStreamSubscription("MyStreamNamespace")]
. Ini memberi tahu runtime streaming bahwa ketika peristiwa dihasilkan pada aliran yang identitasnya adalah GUID XXX dan "MyStreamNamespace"
namespace, itu harus dikirimkan ke butir yang identitasnya adalah XXX jenis MyGrainType
. Artinya, peta runtime mengalir <XXX, MyStreamNamespace>
ke butir <XXX, MyGrainType>
konsumen .
Kehadiran ImplicitStreamSubscription
menyebabkan runtime streaming secara otomatis berlangganan biji-bijian ini ke streaming dan mengirimkan peristiwa streaming ke dalamnya. Namun, kode biji-bijian masih perlu memberi tahu runtime bagaimana peristiwa ingin diproses. Pada dasarnya, perlu melampirkan IAsyncObserver
. Oleh karena itu, ketika biji-bijian diaktifkan, kode biji-bijian di dalamnya OnActivateAsync
perlu memanggil:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Menulis logika langganan
Di bawah ini adalah panduan tentang cara menulis logika langganan untuk berbagai kasus: langganan eksplisit dan implisit, aliran yang dapat digulung balik dan tidak dapat digulung balik. Perbedaan utama antara langganan eksplisit dan implisit adalah bahwa untuk implisit, biji-bijian selalu memiliki tepat satu langganan implisit untuk setiap namespace streaming; tidak ada cara untuk membuat beberapa langganan (tidak ada perkalian langganan), tidak ada cara untuk berhenti berlangganan, dan logika grain selalu hanya perlu melampirkan logika pemrosesan. Itu juga berarti bahwa untuk langganan implisit tidak pernah ada kebutuhan untuk melanjutkan langganan. Di sisi lain, untuk langganan eksplisit, seseorang perlu Melanjutkan langganan, jika tidak, jika biji-bijian berlangganan lagi, itu akan mengakibatkan butiran berlangganan beberapa kali.
Langganan implisit:
Untuk langganan implisit, biji-bijian masih perlu berlangganan untuk melampirkan logika pemrosesan. Ini dapat dilakukan dalam butir konsumen dengan mengimplementasikan IStreamSubscriptionObserver
antarmuka dan IAsyncObserver<T>
, memungkinkan biji-bijian untuk diaktifkan secara terpisah dari berlangganan. Untuk berlangganan aliran, biji-bijian membuat handel dan panggilan await handle.ResumeAsync(this)
dalam metodenya OnSubscribed(...)
.
Untuk memproses pesan, metode ini IAsyncObserver<T>.OnNextAsync(...)
diimplementasikan untuk menerima data aliran dan token urutan. Atau, ResumeAsync
metode ini dapat mengambil sekumpulan delegasi yang mewakili metode IAsyncObserver<T>
antarmuka, , onNextAsync
, onErrorAsync
dan onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Langganan eksplisit:
Untuk langganan eksplisit, grain harus memanggil SubscribeAsync
untuk berlangganan aliran. Ini membuat langganan, serta melampirkan logika pemrosesan. Langganan eksplisit akan ada sampai butir berhenti berlangganan, jadi jika butir dinonaktifkan dan diaktifkan kembali, butiran masih berlangganan secara eksplisit, tetapi tidak ada logika pemrosesan yang akan dilampirkan. Dalam hal ini, biji-bijian perlu melampirkan kembali logika pemrosesan. Untuk melakukan itu, dalam , OnActivateAsync
biji-bijian pertama-tama perlu mencari tahu langganan apa yang dimilikinya, dengan memanggil IAsyncStream<T>.GetAllSubscriptionHandles(). Biji-bijian harus dijalankan ResumeAsync
pada setiap handel yang ingin diproses atau Berhenti BerlanggananAsync pada penanganan apa pun yang dilakukannya. Grain juga dapat secara opsional menentukan StreamSequenceToken
sebagai argumen untuk ResumeAsync
panggilan, yang akan menyebabkan langganan eksplisit ini mulai mengkonsumsi dari token tersebut.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Streaming pesanan dan token urutan
Urutan pengiriman peristiwa antara produsen individu dan konsumen individu tergantung pada penyedia streaming.
Dengan SMS, produsen secara eksplisit mengontrol urutan peristiwa yang dilihat oleh konsumen dengan mengontrol cara produsen menerbitkannya. Secara default (jika SimpleMessageStreamProviderOptions.FireAndForgetDelivery opsi untuk penyedia SMS diatur ke false) dan jika produsen menunggu setiap OnNextAsync
panggilan, peristiwa tiba dalam urutan FIFO. Dalam SMS terserah produsen untuk memutuskan cara menangani kegagalan pengiriman yang akan ditunjukkan oleh kerusakan Task
yang dikembalikan oleh OnNextAsync
panggilan.
Aliran Azure Queue tidak menjamin pesanan FIFO, karena Antrean Azure yang mendasar tidak menjamin urutan dalam kasus kegagalan. (Mereka menjamin urutan FIFO dalam eksekusi bebas kegagalan.) Ketika produsen menghasilkan peristiwa ke Azure Queue, jika operasi antrean gagal, terserah produsen untuk mencoba antrean lain dan nantinya menangani pesan duplikat potensial. Di sisi pengiriman, Orleans runtime Streaming menghapus antrean peristiwa dari antrean dan mencoba mengirimkannya untuk diproses kepada konsumen. Orleans Runtime Streaming menghapus peristiwa dari antrean hanya setelah pemrosesan berhasil. Jika pengiriman atau pemrosesan gagal, peristiwa tidak dihapus dari antrean dan akan muncul kembali secara otomatis dalam antrean nanti. Runtime Streaming akan mencoba mengirimkannya lagi, sehingga berpotensi melanggar pesanan FIFO. Perilaku di atas cocok dengan semantik normal Azure Queues.
Urutan yang Ditentukan Aplikasi: Untuk menangani masalah pemesanan di atas, aplikasi dapat secara opsional menentukan urutannya. Ini dicapai melalui StreamSequenceToken, yang merupakan objek buram IComparable yang dapat digunakan untuk memesan peristiwa. Produser dapat meneruskan opsional StreamSequenceToken
ke OnNext
panggilan. Ini StreamSequenceToken
akan diteruskan kepada konsumen dan akan disampaikan bersama dengan acara tersebut. Dengan begitu, aplikasi dapat beralasan dan membangun ulang urutannya secara independen dari runtime streaming.
Aliran yang dapat digulung balik
Beberapa aliran hanya memungkinkan aplikasi untuk berlangganannya mulai dari titik waktu terbaru, sementara aliran lain memungkinkan "kembali ke waktu". Kemampuan terakhir tergantung pada teknologi antrean yang mendasar dan penyedia aliran tertentu. Misalnya, Azure Queues hanya mengizinkan penggunaan peristiwa antrean terbaru, sementara EventHub memungkinkan pemutaran ulang peristiwa dari titik waktu arbitrer (hingga beberapa waktu kedaluwarsa). Aliran yang mendukung kembali ke waktu yang disebut aliran yang dapat digulung balik.
Konsumen aliran yang dapat digulung balik dapat meneruskan StreamSequenceToken
ke SubscribeAsync
panggilan. Runtime akan mengirimkan peristiwa ke dalamnya mulai dari itu StreamSequenceToken
. Token null berarti konsumen ingin menerima peristiwa mulai dari yang terbaru.
Kemampuan untuk memutar balik aliran sangat berguna dalam skenario pemulihan. Misalnya, pertimbangkan butir yang berlangganan aliran dan secara berkala memeriksa statusnya bersama dengan token urutan terbaru. Saat pulih dari kegagalan, biji-bijian dapat berlangganan kembali ke aliran yang sama dari token urutan titik pemeriksaan terbaru, sehingga pulih tanpa kehilangan peristiwa apa pun yang dihasilkan sejak titik pemeriksaan terakhir.
Penyedia Azure Event Hubs dapat digulung balik. Anda dapat menemukan kodenya di GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. Penyedia SMS dan Azure Queue tidak dapat digulung balik.
Pemrosesan stateless yang diskalakan secara otomatis
Secara default, Orleans Streaming ditargetkan untuk mendukung sejumlah besar aliran yang relatif kecil, masing-masing diproses oleh satu atau beberapa butir stateful. Secara kolektif, pemrosesan semua aliran bersama-sama dipecah di antara sejumlah besar biji-bijian reguler (stateful). Kode aplikasi mengontrol sharding ini dengan menetapkan id aliran dan id biji-bijian dan dengan berlangganan secara eksplisit. Tujuannya adalah pemrosesan stateful pecahan.
Namun, ada juga skenario menarik dari pemrosesan stateless yang diskalakan secara otomatis. Dalam skenario ini, aplikasi memiliki sejumlah kecil aliran (atau bahkan satu aliran besar) dan tujuannya adalah pemrosesan tanpa status. Contohnya adalah aliran peristiwa global, di mana pemrosesan melibatkan decoding setiap peristiwa dan berpotensi meneruskannya ke aliran lain untuk pemrosesan stateful lebih lanjut. Pemrosesan aliran yang diskalakan tanpa status dapat didukung melalui Orleans StatelessWorkerAttribute biji-bijian.
Status Pemrosesan Stateless Yang Diskalakan Secara Otomatis Saat Ini: Ini belum diimplementasikan. Upaya untuk berlangganan aliran dari StatelessWorker
biji-bijian akan mengakibatkan perilaku yang tidak ditentukan. Kami sedang mempertimbangkan untuk mendukung opsi ini.
Biji-bijian dan Orleans klien
Orleans aliran bekerja secara seragam di seluruh biji-bijian dan Orleans klien. Artinya, API yang sama dapat digunakan di dalam biji-bijian dan di Orleans klien untuk menghasilkan dan mengonsumsi peristiwa. Ini sangat menyederhanakan logika aplikasi, membuat API sisi klien khusus, seperti Grain Observers, redundan.
Pub-sub streaming yang dikelola sepenuhnya dan andal
Untuk melacak langganan streaming, Orleans menggunakan komponen runtime yang disebut Streaming Pub-Sub yang berfungsi sebagai titik pertemuan bagi konsumen stream dan produsen streaming. Pub-sub melacak semua langganan streaming dan mempertahankannya, dan mencocokkan konsumen streaming dengan produsen streaming.
Aplikasi dapat memilih di mana dan bagaimana data Pub-Sub disimpan. Komponen Pub-Sub sendiri diimplementasikan sebagai biji-bijian (disebut PubSubRendezvousGrain
), yang menggunakan Orleans persistensi deklaratif. PubSubRendezvousGrain
menggunakan penyedia penyimpanan bernama PubSubStore
. Seperti halnya biji-bijian apa pun, Anda dapat menunjuk implementasi untuk penyedia penyimpanan. Untuk Streaming Pub-Sub Anda dapat mengubah implementasi PubSubStore
pada waktu konstruksi silo menggunakan pembangun host silo:
Berikut ini mengonfigurasi Pub-Sub untuk menyimpan statusnya dalam tabel Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Dengan begitu data Pub-Sub akan disimpan secara durab di Azure Table. Untuk pengembangan awal, Anda juga dapat menggunakan penyimpanan memori. Selain Pub-Sub, Orleans Streaming Runtime mengirimkan peristiwa dari produsen kepada konsumen, mengelola semua sumber daya runtime yang dialokasikan untuk aliran yang digunakan secara aktif, dan secara transparan mengumpulkan sumber daya runtime dari aliran yang tidak digunakan.
Konfigurasi
Untuk menggunakan aliran, Anda perlu mengaktifkan penyedia streaming melalui host silo atau penyusun klien kluster. Anda dapat membaca selengkapnya tentang penyedia streaming di sini. Contoh penyiapan penyedia aliran:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");