Orleans mulai cepat streaming
Panduan ini akan menunjukkan kepada Anda cara cepat untuk menyiapkan dan menggunakan Orleans Aliran. Untuk mempelajari selengkapnya tentang detail fitur streaming, baca bagian lain dari dokumentasi ini.
Konfigurasi yang diperlukan
Dalam panduan ini, Anda akan menggunakan aliran berbasis memori yang menggunakan pesan biji-bijian untuk mengirim data streaming ke pelanggan. Anda akan menggunakan penyedia penyimpanan dalam memori untuk menyimpan daftar langganan. Menggunakan mekanisme berbasis memori untuk streaming dan penyimpanan hanya ditujukan untuk pengembangan dan pengujian lokal, dan tidak ditujukan untuk lingkungan produksi.
Pada silo, di mana silo
adalah ISiloBuilder, panggil AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Pada klien kluster, di mana client
adalah IClientBuilder, panggil AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
Dalam panduan ini, kita akan menggunakan aliran berbasis pesan sederhana yang menggunakan pesan biji-bijian untuk mengirim data streaming ke pelanggan. Kami akan menggunakan penyedia penyimpanan dalam memori untuk menyimpan daftar langganan, sehingga ini bukan pilihan bijaksana untuk aplikasi produksi nyata.
Pada silo, di mana hostBuilder
adalah ISiloHostBuilder
, panggil AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Pada klien kluster, di mana clientBuilder
adalah IClientBuilder
, panggil AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Catatan
Secara default, pesan yang diteruskan melalui Aliran Pesan Sederhana dianggap tidak dapat diubah, dan dapat diteruskan oleh referensi ke biji-bijian lain. Untuk menonaktifkan perilaku ini, Anda harus mengonfigurasi penyedia SMS untuk menonaktifkan SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Anda dapat membuat aliran, mengirim data menggunakannya sebagai produsen dan juga menerima data sebagai pelanggan.
Menghasilkan peristiwa
Relatif mudah untuk menghasilkan peristiwa untuk aliran. Anda harus terlebih dahulu mendapatkan akses ke penyedia streaming yang Anda tentukan dalam konfigurasi sebelumnya ("StreamProvider"
), lalu memilih streaming dan mendorong data ke dalamnya.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Relatif mudah untuk menghasilkan peristiwa untuk aliran. Anda harus terlebih dahulu mendapatkan akses ke penyedia streaming yang Anda tentukan dalam konfigurasi sebelumnya ("SMSProvider"
), lalu memilih streaming dan mendorong data ke dalamnya.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Seperti yang Anda lihat, aliran kami memiliki GUID dan namespace. Ini akan memudahkan untuk mengidentifikasi aliran unik. Misalnya, namespace untuk ruang obrolan dapat berupa "Rooms" dan GUID dapat menjadi GUID RoomGrain pemilik.
Di sini kita menggunakan GUID dari beberapa ruang obrolan yang diketahui. OnNextAsync
Menggunakan metode aliran, kita dapat mendorong data ke dalamnya. Mari kita lakukan di dalam timer, menggunakan angka acak. Anda juga dapat menggunakan jenis data lain untuk aliran.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Berlangganan dan menerima data streaming
Untuk menerima data, Anda dapat menggunakan langganan implisit dan eksplisit, yang dijelaskan secara lebih rinci di Langganan eksplisit dan implisit. Contoh ini menggunakan langganan implisit, yang lebih mudah. Ketika jenis grain ingin berlangganan streaming secara implisit, jenis tersebut menggunakan atribut [ImplicitStreamSubscription(namespace)].
Untuk kasus Anda, tentukan ReceiverGrain
seperti ini:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Setiap kali data didorong ke aliran namespace RANDOMDATA
, seperti yang kita miliki di timer, butir jenis ReceiverGrain
dengan aliran yang sama Guid
akan menerima pesan. Bahkan jika tidak ada aktivasi biji-bijian yang saat ini ada, runtime akan secara otomatis membuat yang baru dan mengirim pesan ke dalamnya.
Agar ini berfungsi, kita perlu menyelesaikan proses langganan dengan mengatur metode kita OnNextAsync
untuk menerima data. Untuk melakukannya, kita ReceiverGrain
harus memanggil sesuatu seperti ini dalam OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Anda sudah siap! Sekarang satu-satunya persyaratan adalah bahwa sesuatu memicu pembuatan butir produsen, dan kemudian akan mendaftarkan timer dan mulai mengirim ints acak ke semua pihak yang tertarik.
Sekali lagi, panduan ini melewati banyak detail dan hanya baik untuk menunjukkan gambaran besar. Baca bagian lain dari manual ini dan sumber daya lain di RX untuk mendapatkan pemahaman yang baik tentang apa yang tersedia dan bagaimana.
Pemrograman reaktif dapat menjadi pendekatan yang sangat kuat untuk memecahkan banyak masalah. Anda misalnya dapat menggunakan LINQ di pelanggan untuk memfilter angka dan melakukan semua hal menarik.