Azure Cosmos DB'de akış çekme modelini değiştirme
UYGULANANLAR: NoSQL
Değişiklik akışı çekme modelini kullanarak Azure Cosmos DB değişiklik akışını kendi hızınızda kullanabilirsiniz. Değişiklik akışı işlemcisine benzer şekilde, değişikliklerin birden çok değişiklik akışı tüketicisi arasında işlenmesini paralelleştirmek için değişiklik akışı çekme modelini kullanabilirsiniz.
Değişiklik akışı işlemcisi ile karşılaştırma
Birçok senaryo, değişiklik akışı işlemcisini veya değişiklik akışı çekme modelini kullanarak değişiklik akışını işleyebilir. Çekme modelinin devamlılık belirteçleri ve değişiklik akışı işlemcisinin kira kapsayıcısı hem son işlenen öğe için yer işaretleri hem de değişiklik akışındaki öğe toplu işleri için yer işaretleri olarak çalışır.
Ancak, devamlılık belirteçlerini kiralamaya dönüştüremezsiniz veya bunun tersi de geçerlidir.
Not
Çoğu durumda, değişiklik akışından okumanız gerektiğinde, en basit seçenek değişiklik akışı işlemcisini kullanmaktır.
Çekme modelini şu senaryolarda kullanmayı düşünmelisiniz:
- Belirli bir bölüm anahtarındaki değişiklikleri okumak için.
- İstemcinizin işleme değişikliklerini alma hızını denetlemek için.
- Değişiklik akışındaki mevcut verilerin tek seferlik okunmasını gerçekleştirmek için (örneğin, veri geçişi yapmak için).
Değişiklik akışı işlemcisi ile değişiklik akışı çekme modeli arasındaki bazı önemli farklar şunlardır:
Özellik | Değişiklik akışı işlemcisi | Değişiklik akışı çekme modeli |
---|---|---|
Değişiklik akışını işlemede geçerli noktayı izleme | Kira (Azure Cosmos DB kapsayıcısında depolanır) | Devamlılık belirteci (bellekte depolanır veya el ile kalıcı hale gelir) |
Geçmiş değişiklikleri yeniden yürütme olanağı | Evet, gönderme modeliyle | Evet, çekme modeliyle |
Gelecekteki değişiklikler için yoklama | Kullanıcı tarafından belirtilen WithPollInterval değere göre değişiklikleri otomatik olarak denetler |
El ile |
Yeni değişiklik yapılmadığı durumlarda davranış | değerini WithPollInterval otomatik olarak bekleyin ve ardından yeniden denetleyin |
Durumu denetlemeli ve el ile yeniden denetlemelidir |
Kapsayıcının tamamından değişiklikleri işleme | Evet ve aynı kapsayıcıdan tüketen birden çok iş parçacığı ve makine arasında otomatik olarak paralelleştirilmiştir | Evet ve kullanarak el ile paralelleştirilmiş FeedRange |
Değişiklikleri yalnızca tek bir bölüm anahtarından işleme | Desteklenmez | Yes |
Not
Çekme modelini kullandığınızda, değişiklik akışı işlemcisini kullanarak okumanın aksine, yeni değişiklik olmayan durumları açıkça işlemeniz gerekir.
Çekme modeliyle çalışma
Çekme modelini kullanarak değişiklik akışını işlemek için bir örneği FeedIterator
oluşturun. öğesini ilk oluşturduğunuzdaFeedIterator
, hem değişiklikleri okumak için başlangıç konumundan hem de için FeedRange
kullanmak istediğiniz değerden oluşan gerekli ChangeFeedStartFrom
bir değer belirtmeniz gerekir. FeedRange
bir bölüm anahtarı değerleri aralığıdır ve değişiklik akışından okunabilecek öğeleri belirli FeedIterator
bir kullanılarak belirtir. Değişiklikleri işlemek istediğiniz mod için de gerekli ChangeFeedMode
bir değer belirtmeniz gerekir: en son sürüm veya tüm sürümler ve silmeler. ChangeFeedMode.LatestVersion
Değişiklik akışını okumak için hangi modu kullanmak istediğinizi belirtmek için veya ChangeFeedMode.AllVersionsAndDeletes
kullanın. Tüm sürümleri ve silme modunu kullandığınızda, belirli bir devamlılık belirtecinin değerinden veya değerinden Now()
bir değişiklik akışı başlangıcı seçmeniz gerekir.
İsteğe bağlı olarak bir PageSizeHint
ayarlamak için belirtebilirsinizChangeFeedRequestOptions
. Bu özellik ayarlandığında, sayfa başına alınan en fazla öğe sayısını ayarlar. İzlenen koleksiyondaki işlemler saklı yordamlar aracılığıyla gerçekleştiriliyorsa, değişiklik akışındaki öğeler okunurken işlem kapsamı korunur. Sonuç olarak, alınan öğe sayısı belirtilen değerden yüksek olabilir, böylece aynı işlem tarafından değiştirilen öğeler tek bir atomik toplu işlemin parçası olarak döndürülür.
Burada, varlık nesnelerini döndüren en son sürüm modunda nasıl edinildiğini FeedIterator
gösteren bir örnek verilmiştir. Bu örnekte bir User
nesne verilmiştir:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
İpucu
sürümünden 3.34.0
önce, ayarıyla ChangeFeedMode.Incremental
en son sürüm modu kullanılabilir. LatestVersion
Hem hem de Incremental
değişiklik akışının en son sürüm moduna bakın ve her iki modu kullanan uygulamalar da aynı davranışı görür.
Tüm sürümler ve silmeler modu önizleme aşamasındadır ve önizleme .NET SDK sürümleri >= 3.32.0-preview
ile kullanılabilir. Aşağıda, nesneleri döndüren User
tüm sürümlerde ve silme modlarında elde etme FeedIterator
örneği verilmiştir:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Not
En son sürüm modunda, değiştirilen öğeyi temsil eden nesneleri ve bazı ek meta verileri alırsınız. Tüm sürümler ve silmeler modu farklı bir veri modeli döndürür. Daha fazla bilgi için bkz . Yanıt nesnesini ayrıştırma.
En son sürüm modu veya tüm sürümler ve silmeler modu için tam örneği alabilirsiniz.
Akışlar aracılığıyla değişiklik akışını kullanma
FeedIterator
her iki değişiklik akışı modu için iki seçenek vardır. Varlık nesnelerini döndüren örneklere ek olarak, yanıtı destekle Stream
de alabilirsiniz. Akışlar, verileri ilk seri durumdan çıkarmadan okumanıza olanak tanıyarak istemci kaynaklarından tasarruf etmenizi sağlar.
Aşağıda, döndüren en son sürüm modunda nasıl edinildiğini FeedIterator
gösteren bir örnek verilmişti Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Kapsayıcının tamamı için değişiklikleri kullanma
parametresini FeedIterator
sağlamazsanızFeedRange
, kapsayıcının değişiklik akışının tamamını kendi hızınızda işleyebilirsiniz. Aşağıda, en son sürüm modunu kullanarak geçerli zamanda başlayarak tüm değişiklikleri okumaya başlayan bir örnek verilmişti:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Değişiklik akışı, gelecekteki tüm yazma ve güncelleştirmeleri kapsayan sonsuz bir öğe listesi olduğundan, değeri HasMoreResults
her zaman true
olur. Değişiklik akışını okumaya çalıştığınızda ve kullanılabilir yeni bir değişiklik olmadığında, durum bilgisi olan NotModified
bir yanıt alırsınız. Yukarıdaki örnekte, değişiklikler yeniden denetlenmeden önce beş saniye beklenerek işlenir.
Bölüm anahtarı için değişiklikleri kullanma
Bazı durumlarda, yalnızca belirli bir bölüm anahtarına yönelik değişiklikleri işlemek isteyebilirsiniz. Belirli bir bölüm anahtarı için alabilir FeedIterator
ve değişiklikleri kapsayıcının tamamında yaptığınız gibi işleyebilirsiniz.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Paralelleştirme için FeedRange kullanma
Değişiklik akışı işlemcisinde, çalışma otomatik olarak birden çok tüketiciye yayılır. Değişiklik akışı çekme modelinde, değişiklik akışının işlenmesini paralelleştirmek için öğesini kullanabilirsiniz FeedRange
. A FeedRange
, bölüm anahtarı değerleri aralığını temsil eder.
Kapsayıcınız için aralıkların listesinin nasıl alındığını gösteren bir örnek aşağıda verilmiştir:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Kapsayıcınız için değerlerin FeedRange
listesini aldığınızda, fiziksel bölüm başına bir FeedRange
tane alırsınız.
kullanarak FeedRange
, değişiklik akışının birden çok makine veya iş parçacığı arasında işlenmesini paralelleştirmek için bir FeedIterator
oluşturabilirsiniz. Kapsayıcının tamamı için veya tek bir bölüm anahtarının nasıl alındığını FeedIterator
gösteren önceki örnekten farklı olarak, değişiklik akışını paralel olarak işleyebilen birden çok FeedIterator elde etmek için FeedRanges kullanabilirsiniz.
FeedRanges kullanmak istediğiniz durumlarda, FeedRanges'i alan ve bunları bu makinelere dağıtan bir düzenleyici işlemine sahip olmanız gerekir. Bu dağıtım şöyle olabilir:
- Bu dize değerini kullanma
FeedRange.ToJsonString
ve dağıtma. Tüketiciler bu değeri ileFeedRange.FromJsonString
kullanabilir. - Dağıtım devam ediyorsa, nesne başvuruyu
FeedRange
geçirin.
Aşağıda, paralel olarak okunan iki varsayımsal ayrı makine kullanılarak kapsayıcının değişiklik akışının başından itibaren nasıl okunduğunu gösteren bir örnek verilmiştir:
1. Makine:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
2. Makine:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Devamlılık belirteçlerini kaydetme
Devamlılık belirtecini alarak konumunuzu FeedIterator
kaydedebilirsiniz. Devamlılık belirteci, FeedIterator'ınızın son işlenen değişikliklerini izleyen ve daha sonra bu noktada sürdürülmesini sağlayan bir dize değeridir FeedIterator
. Devamlılık belirteci belirtilirse, başlangıç zamanından önceliklidir ve başlangıç değerlerinden başlar. Aşağıdaki kod, kapsayıcı oluşturma işleminden bu yana değişiklik akışını okur. Daha fazla değişiklik sağlandıktan sonra, değişiklik akışı tüketiminin daha sonra sürdürülebilmesi için bir devamlılık belirteci kalıcı hale gelir.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
En son sürüm modunu kullanırken, Azure Cosmos DB kapsayıcısı FeedIterator
hala mevcut olduğu sürece devamlılık belirtecinin süresi hiçbir zaman dolmaz. Tüm sürümleri ve silme modunu kullanırken, FeedIterator
sürekli yedeklemeler için bekletme penceresinde değişiklikler olduğu sürece devamlılık belirteci geçerli olur.