Pustaka System.Threading.Channels
Namespace System.Threading.Channels menyediakan serangkaian struktur data sinkronisasi untuk meneruskan data antara produsen dan konsumen secara asinkron. Pustaka menargetkan .NET Standard dan berfungsi pada semua implementasi .NET.
Pustaka ini tersedia dalam paket System.Threading.Channels NuGet. Namun, jika Anda menggunakan .NET Core 3.0 atau yang lebih baru, paket disertakan sebagai bagian dari kerangka kerja.
Model pemrograman konseptual produsen/konsumen
Saluran adalah implementasi dari model pemrograman konseptual produsen/konsumen. Dalam model pemrograman ini, produsen secara asinkron menghasilkan data, dan konsumen secara asinkron mengonsumsi data tersebut. Dengan kata lain, model ini meneruskan data dari satu pihak ke pihak lain melalui antrean first-in first-out ("FIFO"). Cobalah untuk memikirkan saluran seperti yang Anda lakukan jenis koleksi umum lainnya, seperti List<T>
. Perbedaan utamanya adalah koleksi ini mengelola sinkronisasi dan menyediakan berbagai model konsumsi melalui opsi pembuatan pabrik. Opsi ini mengontrol perilaku saluran, seperti berapa banyak elemen yang diizinkan untuk disimpan dan apa yang terjadi jika batas tersebut tercapai, atau apakah saluran diakses oleh beberapa produsen atau beberapa konsumen secara bersamaan.
Strategi pembatas
Tergantung pada bagaimana dibuat Channel<T>
, pembaca dan penulisnya bersifat berbeda.
Untuk membuat saluran yang menentukan kapasitas maksimum, panggil Channel.CreateBounded. Untuk membuat saluran yang digunakan oleh sejumlah pembaca dan penulis secara bersamaan, hubungi Channel.CreateUnbounded. Setiap strategi pembatas mengekspos berbagai opsi yang ditentukan kreator, baik BoundedChannelOptions atau UnboundedChannelOptions masing-masing.
Catatan
Terlepas dari strategi pembatas, saluran akan selalu melempar ChannelClosedException ketika digunakan setelah ditutup.
Saluran tidak terikat
Untuk membuat saluran yang tidak terbatas, panggil salah Channel.CreateUnbounded satu kelebihan beban:
var channel = Channel.CreateUnbounded<T>();
Saat Anda membuat saluran yang tidak terbatas, secara default, saluran dapat digunakan oleh sejumlah pembaca dan penulis secara bersamaan. Atau, Anda dapat menentukan perilaku nondefault saat membuat saluran yang tidak terbatas dengan menyediakan UnboundedChannelOptions
instans. Kapasitas saluran tidak terbatas dan semua penulisan dilakukan secara sinkron. Untuk contoh selengkapnya, lihat Pola pembuatan yang tidak terbatas.
Saluran terikat
Untuk membuat saluran terikat, panggil salah Channel.CreateBounded satu kelebihan beban:
var channel = Channel.CreateBounded<T>(7);
Kode sebelumnya membuat saluran yang memiliki kapasitas 7
maksimum item. Saat Anda membuat saluran terikat, saluran terikat ke kapasitas maksimum. Ketika terikat tercapai, perilaku defaultnya adalah bahwa saluran secara asinkron memblokir produsen sampai ruang tersedia. Anda dapat mengonfigurasi perilaku ini dengan menentukan opsi saat membuat saluran. Saluran terikat dapat dibuat dengan nilai kapasitas apa pun yang lebih besar dari nol. Untuk contoh lain, lihat Pola pembuatan terikat.
Perilaku mode penuh
Saat menggunakan saluran terikat, Anda dapat menentukan perilaku yang dipatuhi saluran saat terikat yang dikonfigurasi tercapai. Tabel berikut ini mencantumkan perilaku mode penuh untuk setiap BoundedChannelFullMode nilai:
Nilai | Perilaku |
---|---|
BoundedChannelFullMode.Wait | Ini adalah nilai default. Panggilan untuk WriteAsync menunggu ruang tersedia untuk menyelesaikan operasi tulis. Panggilan untuk TryWrite segera kembali false . |
BoundedChannelFullMode.DropNewest | Menghapus dan mengabaikan item terbaru di saluran untuk memberi ruang bagi item yang sedang ditulis. |
BoundedChannelFullMode.DropOldest | Menghapus dan mengabaikan item terlama di saluran untuk memberi ruang bagi item yang sedang ditulis. |
BoundedChannelFullMode.DropWrite | Menghilangkan item yang sedang ditulis. |
Penting
Channel<TWrite,TRead>.Writer Setiap kali menghasilkan lebih cepat daripada yang Channel<TWrite,TRead>.Reader dapat dikonsumsi, penulis saluran mengalami tekanan balik.
API Produser
Fungsionalitas produsen diekspos pada Channel<TWrite,TRead>.Writer. API produsen dan perilaku yang diharapkan dirinci dalam tabel berikut:
API | Perilaku yang diperkirakan |
---|---|
ChannelWriter<T>.Complete | Menandai saluran sebagai selesai, yang berarti tidak ada lagi item yang ditulis ke saluran tersebut. |
ChannelWriter<T>.TryComplete | Mencoba menandai saluran sebagai selesai, yang berarti tidak ada lagi data yang ditulis ke saluran tersebut. |
ChannelWriter<T>.TryWrite | Mencoba menulis item yang ditentukan ke saluran. Ketika digunakan dengan saluran yang tidak terbatas, ini selalu kembali true kecuali penulis saluran memberi sinyal penyelesaian dengan ChannelWriter<T>.Complete, atau ChannelWriter<T>.TryComplete. |
ChannelWriter<T>.WaitToWriteAsync | Mengembalikan yang ValueTask<TResult> selesai ketika spasi tersedia untuk menulis item. |
ChannelWriter<T>.WriteAsync | Menulis item secara asinkron ke saluran. |
Consumer API
Fungsionalitas konsumen diekspos pada Channel<TWrite,TRead>.Reader. API konsumen dan perilaku yang diharapkan dirinci dalam tabel berikut:
API | Perilaku yang diperkirakan |
---|---|
ChannelReader<T>.ReadAllAsync | Membuat yang IAsyncEnumerable<T> memungkinkan membaca semua data dari saluran. |
ChannelReader<T>.ReadAsync | Secara asinkron membaca item dari saluran. |
ChannelReader<T>.TryPeek | Mencoba mengintip item dari saluran. |
ChannelReader<T>.TryRead | Mencoba membaca item dari saluran. |
ChannelReader<T>.WaitToReadAsync | Mengembalikan yang ValueTask<TResult> selesai saat data tersedia untuk dibaca. |
Pola penggunaan umum
Ada beberapa pola penggunaan untuk saluran. API dirancang agar sederhana, konsisten, dan fleksibel mungkin. Semua metode asinkron mengembalikan ValueTask
(atau ValueTask<bool>
) yang mewakili operasi asinkron ringan yang dapat menghindari alokasi jika operasi selesai secara sinkron dan berpotensi bahkan secara asinkron. Selain itu, API dirancang agar dapat dikomposisikan, karena pembuat saluran membuat janji tentang penggunaan yang dimaksudkan. Ketika saluran dibuat dengan parameter tertentu, implementasi internal dapat beroperasi lebih efisien mengetahui janji-janji ini.
Pola pembuatan
Bayangkan Anda membuat solusi produsen/konsumen untuk sistem posisi global (GPS). Anda ingin melacak koordinat perangkat dari waktu ke waktu. Objek koordinat sampel mungkin terlihat seperti ini:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Pola pembuatan yang tidak terikat
Salah satu pola penggunaan umum adalah membuat saluran tidak terikat default:
var channel = Channel.CreateUnbounded<Coordinates>();
Namun, mari kita bayangkan bahwa Anda ingin membuat saluran yang tidak terbatas dengan beberapa produsen dan konsumen:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
Dalam hal ini, semua tulisan sinkron, bahkan WriteAsync
. Ini karena saluran yang tidak terbatas selalu memiliki ruang yang tersedia untuk penulisan secara efektif segera. Namun, dengan AllowSynchronousContinuations
diatur ke true
, penulisan mungkin akhirnya melakukan pekerjaan yang terkait dengan pembaca dengan menjalankan kelanjutannya. Ini tidak memengaruhi sinkronisasi operasi.
Pola pembuatan terikat
Dengan saluran terikat, konfigurasi saluran harus diketahui oleh konsumen untuk membantu memastikan konsumsi yang tepat. Artinya, konsumen harus tahu perilaku apa yang dipamerkan saluran ketika terikat yang dikonfigurasi tercapai. Mari kita jelajahi beberapa pola pembuatan terikat umum.
Cara paling sederhana untuk membuat saluran terikat adalah dengan menentukan kapasitas:
var channel = Channel.CreateBounded<Coordinates>(1);
Kode sebelumnya membuat saluran terikat dengan kapasitas 1
maksimum . Opsi lain tersedia, beberapa opsi sama dengan saluran yang tidak terbatas, sementara yang lain khusus untuk saluran yang tidak terbatas:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
Dalam kode sebelumnya, saluran dibuat sebagai saluran terikat yang dibatasi hingga 1.000 item, dengan satu penulis tetapi banyak pembaca. Perilaku mode penuhnya didefinisikan sebagai DropWrite
, yang berarti bahwa ia menjatuhkan item yang ditulis jika saluran penuh.
Untuk mengamati item yang dihilangkan saat menggunakan saluran terikat, daftarkan itemDropped
panggilan balik:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Setiap kali saluran penuh dan item baru ditambahkan, itemDropped
panggilan balik dipanggil. Dalam contoh ini, panggilan balik yang disediakan menulis item ke konsol, tetapi Anda bebas untuk mengambil tindakan lain yang Anda inginkan.
Pola produsen
Bayangkan bahwa produser dalam skenario ini menulis koordinat baru ke saluran. Produser dapat melakukan ini dengan memanggil TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Kode produsen sebelumnya:
Channel<Coordinates>.Writer
Menerima (ChannelWriter<Coordinates>
) sebagai argumen, bersama dengan awalCoordinates
.- Menentukan perulangan bersyarat
while
yang mencoba memindahkan koordinat menggunakanTryWrite
.
Produsen alternatif mungkin menggunakan metode :WriteAsync
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Sekali lagi, Channel<Coordinates>.Writer
digunakan dalam perulangan while
. Tapi kali ini, WriteAsync metode ini dipanggil. Metode akan berlanjut hanya setelah koordinat ditulis. Ketika perulangan while
keluar, panggilan ke Complete dilakukan, yang menandakan bahwa tidak ada lagi data yang ditulis ke saluran.
Pola produsen lain adalah menggunakan metode , WaitToWriteAsync pertimbangkan kode berikut:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Sebagai bagian dari kondisi while
, hasil WaitToWriteAsync
panggilan digunakan untuk menentukan apakah akan melanjutkan perulangan.
Pola konsumen
Ada beberapa pola konsumen saluran umum. Ketika saluran tidak pernah berakhir, yang berarti menghasilkan data tanpa batas waktu, konsumen dapat menggunakan perulangan while (true)
, dan membaca data saat tersedia:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Catatan
Kode ini akan melemparkan pengecualian jika saluran ditutup.
Konsumen alternatif dapat menghindari kekhawatiran ini dengan menggunakan perulangan sementara berlapis, seperti yang ditunjukkan dalam kode berikut:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
Dalam kode sebelumnya, konsumen menunggu untuk membaca data. Setelah data tersedia, konsumen mencoba membacanya. Perulangan ini terus mengevaluasi sampai produsen saluran memberi sinyal bahwa ia tidak lagi memiliki data untuk dibaca. Dengan demikian, ketika produsen diketahui memiliki jumlah item terbatas yang dihasilkannya dan menandakan penyelesaian, konsumen dapat menggunakan await foreach
semantik untuk melakukan iterasi atas item:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Kode sebelumnya menggunakan ReadAllAsync metode untuk membaca semua koordinat dari saluran.