Model penarikan umpan perubahan di Azure Cosmos DB
BERLAKU UNTUK: NoSQL
Anda dapat menggunakan model penarikan umpan perubahan untuk menggunakan umpan perubahan Azure Cosmos DB dengan kecepatan Anda sendiri. Mirip dengan prosesor umpan perubahan, Anda dapat menggunakan model penarikan umpan perubahan untuk menyejajarkan pemrosesan perubahan di beberapa konsumen umpan perubahan.
Bandingkan dengan prosesor umpan perubahan
Banyak skenario dapat memproses umpan perubahan dengan menggunakan prosesor umpan perubahan atau model penarikan umpan perubahan. Token kelanjutan model penarikan dan kontainer sewa prosesor umpan perubahan berfungsi sebagai marka buku untuk item terakhir yang diproses atau batch item dalam umpan perubahan.
Namun, Anda tidak dapat mengonversi token kelanjutan menjadi sewa atau sebaliknya.
Catatan
Dalam kebanyakan kasus, ketika Anda perlu membaca dari umpan perubahan, opsi paling sederhana adalah menggunakan prosesor umpan perubahan.
Anda harus pertimbangkan model penarikan dalam skenario ini:
- Untuk membaca perubahan dari kunci partisi tertentu.
- Untuk mengontrol kecepatan di mana klien Anda menerima perubahan untuk diproses.
- Untuk melakukan pembacaan satu kali data yang ada di umpan perubahan (misalnya, untuk melakukan migrasi data).
Berikut adalah beberapa perbedaan utama antara prosesor umpan perubahan dan model penarikan umpan perubahan:
Fitur | Prosesor umpan perubahan | Model penarikan umpan perubahan |
---|---|---|
Melacak titik saat ini dalam memproses umpan perubahan | Sewa (disimpan dalam kontainer Azure Cosmos DB) | Token kelanjutan (disimpan dalam memori atau ditahan secara manual) |
Kemampuan untuk memutar ulang perubahan sebelumnya | Ya, dengan model pendorongan | Ya, dengan model penarikan |
Polling untuk perubahan di masa mendatang | Memeriksa perubahan secara otomatis berdasarkan nilai yang ditentukan WithPollInterval pengguna |
Manual |
Perilaku tanpa perubahan baru | Tunggu nilai secara otomatis lalu WithPollInterval centang ulang |
Harus memeriksa status dan memeriksa ulang secara manual |
Memproses perubahan dari seluruh kontainer | Ya, dan secara otomatis diparalelkan di beberapa utas dan mesin yang menggunakan dari kontainer yang sama | Ya, dan diparalelkan secara manual dengan menggunakan FeedRange |
Memproses perubahan hanya dari satu kunci partisi | Tidak didukung | Ya |
Catatan
Saat Anda menggunakan model penarikan, tidak seperti saat membaca dengan menggunakan prosesor umpan perubahan, Anda harus secara eksplisit menangani kasus di mana tidak ada perubahan baru.
Bekerja dengan model penarikan
Untuk memproses umpan perubahan dengan menggunakan model penarikan, buat instans FeedIterator
. Ketika Anda awalnya membuat FeedIterator
, Anda harus menentukan nilai yang diperlukan ChangeFeedStartFrom
, yang terdiri dari posisi awal untuk membaca perubahan dan nilai yang ingin Anda gunakan untuk FeedRange
. FeedRange
adalah rentang nilai kunci partisi dan menentukan item yang dapat dibaca dari umpan perubahan dengan menggunakan spesifik FeedIterator
tersebut. Anda juga harus menentukan nilai yang diperlukan ChangeFeedMode
untuk mode di mana Anda ingin memproses perubahan: versi terbaru atau semua versi dan penghapusan. Gunakan atau ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
untuk menunjukkan mode mana yang ingin Anda gunakan untuk membaca umpan perubahan. Saat Anda menggunakan semua versi dan mode penghapusan, Anda harus memilih umpan perubahan mulai dari nilai baik Now()
dari atau dari token kelanjutan tertentu.
Anda dapat secara opsional menentukan ChangeFeedRequestOptions
untuk mengatur PageSizeHint
. Saat diatur, properti ini menetapkan jumlah maksimum item yang diterima per halaman. Jika operasi dalam koleksi terpantau dilakukan melalui prosedur yang disimpan, cakupan transaksi dipertahankan saat membaca item dari umpan perubahan. Akibatnya, jumlah item yang diterima mungkin lebih tinggi dari nilai yang ditentukan sehingga item yang diubah oleh transaksi yang sama dikembalikan sebagai bagian dari satu batch atomik.
Berikut adalah contoh cara mendapatkan FeedIterator
dalam mode versi terbaru yang mengembalikan objek entitas, dalam hal User
ini objek:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tip
Sebelum versi 3.34.0
, mode versi terbaru dapat digunakan dengan mengatur ChangeFeedMode.Incremental
. Baik Incremental
dan LatestVersion
lihat mode versi terbaru dari umpan perubahan dan aplikasi yang menggunakan salah satu mode akan melihat perilaku yang sama.
Semua versi dan mode penghapusan dalam pratinjau dan dapat digunakan dengan pratinjau versi >.NET SDK = 3.32.0-preview
. Berikut adalah contoh untuk mendapatkan FeedIterator
di semua versi dan menghapus mode yang mengembalikan User
objek:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Catatan
Dalam mode versi terbaru, Anda menerima objek yang mewakili item yang berubah, dengan beberapa metadata tambahan. Semua versi dan mode penghapusan mengembalikan model data yang berbeda. Untuk informasi selengkapnya, lihat Mengurai objek respons.
Anda bisa mendapatkan sampel lengkap untuk mode versi terbaru atau semua versi dan mode penghapusan.
Mengonsumsi umpan perubahan melalui aliran
FeedIterator
untuk kedua mode umpan perubahan memiliki dua opsi. Selain contoh yang mengembalikan objek entitas, Anda juga dapat memperoleh respons dengan Stream
dukungan. Aliran memungkinkan Anda membaca data tanpa terlebih dahulu deserialisasi, sehingga Anda menghemat sumber daya klien.
Berikut adalah contoh cara mendapatkan FeedIterator
dalam mode versi terbaru yang mengembalikan Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Mengonsumsi perubahan untuk seluruh kontainer
Jika Anda tidak menyediakan FeedRange
parameter ke FeedIterator
, Anda dapat memproses seluruh umpan perubahan kontainer dengan kecepatan Anda sendiri. Berikut adalah contoh, yang mulai membaca semua perubahan, dimulai pada waktu saat ini dengan menggunakan mode versi terbaru:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Karena umpan perubahan secara efektif merupakan daftar item tak terbatas yang mencakup semua penulisan dan pembaruan di masa mendatang, nilainya HasMoreResults
selalu true
. Ketika Anda mencoba membaca umpan perubahan dan tidak ada perubahan baru yang tersedia, Anda menerima respons dengan NotModified
status. Dalam contoh sebelumnya, ini ditangani dengan menunggu lima detik sebelum memeriksa ulang perubahan.
Mengonsumsi perubahan untuk kunci partisi
Dalam beberapa kasus, Anda mungkin hanya ingin memproses perubahan untuk kunci partisi tertentu. Anda dapat memperoleh FeedIterator
kunci partisi tertentu dan memproses perubahan dengan cara yang sama seperti yang Anda bisa untuk seluruh kontainer.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Gunakan FeedRange untuk paralelisasi
Dalam prosesor umpan perubahan, pekerjaan tersebar di beberapa konsumen secara otomatis. Dalam model penarikan umpan perubahan, Anda dapat menggunakan FeedRange
untuk paralelisasi pemrosesan umpan perubahan. FeedRange
mewakili rentang nilai kunci partisi.
Berikut adalah contoh yang menunjukkan cara mendapatkan daftar rentang untuk kontainer Anda:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Saat Anda mendapatkan daftar FeedRange
nilai untuk kontainer, Anda mendapatkan satu FeedRange
per partisi fisik.
Dengan menggunakan FeedRange
, Anda dapat membuat FeedIterator
untuk menyejajarkan pemrosesan umpan perubahan di beberapa komputer atau utas. Berbeda dengan contoh sebelumnya yang menunjukkan cara mendapatkan FeedIterator
untuk seluruh kontainer atau satu kunci partisi, Anda dapat menggunakan FeedRanges untuk mendapatkan beberapa FeedIterator yang dapat memproses umpan perubahan secara paralel.
Saat ingin menggunakan FeedRanges, Anda harus memiliki proses orkestrator yang mendapatkan FeedRanges dan mendistribusikannya ke mesin tersebut. Distribusi ini mungkin:
- Menggunakan
FeedRange.ToJsonString
dan mendistribusikan nilai untai (karakter) ini. Konsumen dapat menggunakan nilai ini denganFeedRange.FromJsonString
. - Jika distribusi sedang berproses, lewati referensi objek
FeedRange
.
Berikut adalah sampel yang menunjukkan cara membaca dari awal umpan perubahan kontainer dengan menggunakan dua komputer terpisah hipotetis yang dibaca secara paralel:
Mesin 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Mesin 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Menyimpan token kelanjutan
Anda dapat menyimpan posisi FeedIterator
dengan mendapatkan token kelanjutan. Token kelanjutan adalah nilai untai (karakter) yang melacak perubahan terakhir yang diproses FeedIterator Anda dan mengizinkan FeedIterator
untuk melanjutkan pada titik ini nantinya. Token kelanjutan, jika ditentukan, lebih diutamakan dari waktu mulai dan dimulai dari nilai awal. Kode berikut membaca umpan perubahan sejak pembuatan kontainer. Setelah tidak ada lagi perubahan yang tersedia, kode tersebut akan menahan token kelanjutan sehingga konsumsi umpan perubahan dapat dilanjutkan nanti.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Saat Anda menggunakan mode versi terbaru, token kelanjutan FeedIterator
tidak pernah kedaluwarsa selama kontainer Azure Cosmos DB masih ada. Saat Anda menggunakan semua versi dan mode penghapusan, FeedIterator
token kelanjutan valid selama perubahan terjadi dalam jendela retensi untuk pencadangan berkelanjutan.