Adatcsatorna lekérési modelljének módosítása az Azure Cosmos DB-ben
A KÖVETKEZŐRE VONATKOZIK: NoSQL
A változáscsatorna lekérési modelljének használatával saját tempójában használhatja az Azure Cosmos DB változáscsatornáját. A változáscsatorna-feldolgozóhoz hasonlóan a változáscsatorna lekérési modelljével párhuzamossá teheti a módosítások feldolgozását több változáscsatorna-fogyasztó között.
Összehasonlítás a változáscsatorna-feldolgozóval
Számos forgatókönyv feldolgozhatja a változáscsatornát a változáscsatorna-feldolgozó vagy a változáscsatorna lekérési modelljének használatával. A lekéréses modell folytatási jogkivonatai és a változáscsatorna-feldolgozó bérlettárolója egyaránt könyvjelzőként szolgál a változáscsatorna utolsó feldolgozott eleméhez vagy tételkötegéhez.
A folytatási jogkivonatokat azonban nem konvertálhatja bérletté, vagy fordítva.
Feljegyzés
A legtöbb esetben, amikor a változáscsatornából kell olvasnia, a legegyszerűbb megoldás a változáscsatorna-feldolgozó használata.
Érdemes megfontolni a lekéréses modell használatát az alábbi esetekben:
- Egy adott partíciókulcs módosításainak olvasása.
- Annak szabályozása, hogy az ügyfél milyen ütemben fogadja a módosításokat a feldolgozáshoz.
- A változáscsatorna meglévő adatainak egyszeri olvasásához (például adatmigráláshoz).
Íme néhány fontos különbség a változáscsatorna processzora és a változáscsatorna lekérési modellje között:
Szolgáltatás | Változáscsatorna feldolgozója | Változáscsatorna lekérési modellje |
---|---|---|
A változáscsatorna feldolgozásának aktuális pontjának nyomon követése | Bérlet (Azure Cosmos DB-tárolóban tárolva) | Folytatási jogkivonat (memóriában tárolva vagy manuálisan megőrzve) |
A korábbi módosítások visszajátszásának képessége | Igen, leküldéses modellel | Igen, lekéréses modellel |
Lekérdezés a jövőbeli változásokról | A felhasználó által megadott WithPollInterval érték alapján automatikusan ellenőrzi a módosításokat |
Manuális |
Olyan viselkedés, amelyben nincsenek új módosítások | Az érték automatikus várakozása WithPollInterval , majd újraellenőrzése |
Ellenőriznie kell az állapotot, és manuálisan újra kell ellenőriznie |
Változások feldolgozása egy teljes tárolóból | Igen, és automatikusan párhuzamosított több szál és gép között, amelyek ugyanazon tárolóból származnak | Igen, és manuálisan párhuzamosítva a következő használatával: FeedRange |
Csak egyetlen partíciókulcs módosításainak feldolgozása | Nem támogatott | Igen |
Feljegyzés
A lekéréses modell használata esetén – a változáscsatorna-feldolgozóval való olvasástól eltérően – kifejezetten kezelnie kell azokat az eseteket, amikor nincsenek új módosítások.
A lekéréses modell működése
A változáscsatorna lekéréses modellel történő feldolgozásához hozzon létre egy példányt.FeedIterator
Az első létrehozáskor FeedIterator
meg kell adnia egy kötelező ChangeFeedStartFrom
értéket, amely a módosítások olvasásának kezdő pozíciójából és a használni FeedRange
kívánt értékből áll. Ez FeedRange
a partíciókulcs-értékek tartománya, és meghatározza azokat az elemeket, amelyek az adott adatcsatornával FeedIterator
olvashatók a változáscsatornából. Meg kell adnia egy kötelező ChangeFeedMode
értéket ahhoz a módhoz is, amelyben a módosításokat feldolgozni szeretné: a legújabb verziót vagy az összes verziót és törlést. Használja vagy ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
jelezze, hogy melyik módot szeretné használni a változáscsatorna olvasásához. Ha az összes verziót és törlési módot használja, ki kell választania egy változáscsatornát, amely egy Now()
adott folytatási jogkivonat értékéből indul ki.
Igény szerint megadhatja ChangeFeedRequestOptions
, hogy be legyen-e állítva egy PageSizeHint
. Ha be van állítva, ez a tulajdonság beállítja az oldalonként fogadott elemek maximális számát. Ha a figyelt gyűjtemény műveletei tárolt eljárásokon keresztül történnek, a tranzakció hatóköre megmarad a változáscsatorna elemeinek beolvasásakor. Ennek eredményeképpen a fogadott elemek száma magasabb lehet a megadott értéknél, így az ugyanazon tranzakcióval módosított elemek egy atomi köteg részeként lesznek visszaadva.
Íme egy példa arra, hogyan szerezhető be FeedIterator
a legújabb verziómód, amely entitásobjektumokat ad vissza, ebben az esetben egy User
objektumot:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tipp.
A verzió 3.34.0
előtt a legújabb verziómód használható a beállítással ChangeFeedMode.Incremental
. LatestVersion
A Incremental
változáscsatorna legújabb verziómódjára és a mindkét módot használó alkalmazásokra is ugyanez a viselkedés fog vonatkozni.
Az összes verzió és törlési mód előzetes verzióban érhető el, és a .NET SDK előzetes verziójával >is használható . 3.32.0-preview
Íme egy példa az összes olyan verzió és törlési mód beszerzésére FeedIterator
, amely objektumokat User
ad vissza:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Feljegyzés
A legújabb verzió módban olyan objektumokat kap, amelyek a módosított elemet jelölik, néhány további metaadattal. Minden verzió és törlési mód egy másik adatmodellt ad vissza. További információ: A válaszobjektum elemzése.
Beszerezheti a teljes mintát a legújabb verziómódhoz, vagy az összes verzióhoz és törlési módhoz.
A változáscsatorna felhasználása streameken keresztül
FeedIterator
mindkét változáscsatorna-módhoz két lehetőség áll rendelkezésre. Az entitásobjektumokat visszaadandó példák mellett a választ is beszerezheti támogatással Stream
. A streamek lehetővé teszik az adatok olvasását anélkül, hogy azokat először deszerializálni kellene, így az ügyfélerőforrásokra menthet.
Íme egy példa a legújabb verzió módban való beszerzésére FeedIterator
, amely a következőt adja Stream
vissza:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
A teljes tároló módosításainak felhasználása
Ha nem ad meg paramétert FeedRange
FeedIterator
, saját ütemben feldolgozhatja egy teljes tároló változáscsatornáját. Íme egy példa, amely elkezdi olvasni az összes módosítást a jelenlegi időponttól kezdve a legújabb verziómód használatával:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Mivel a változáscsatorna gyakorlatilag az összes jövőbeli írást és frissítést magában foglaló elemek végtelen listája, az érték HasMoreResults
mindig true
. Amikor megpróbálja elolvasni a változáscsatornát, és nincsenek új módosítások, állapotú NotModified
választ kap. Az előző példában úgy kezeli a rendszer, hogy öt másodpercet vár a módosítások újbóli ellenőrzése előtt.
Partíciókulcs módosításainak felhasználása
Bizonyos esetekben előfordulhat, hogy csak egy adott partíciókulcs módosításait szeretné feldolgozni. Beszerezhet FeedIterator
egy adott partíciókulcsot, és ugyanúgy dolgozhatja fel a módosításokat, mint egy teljes tároló esetében.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
A FeedRange használata párhuzamosításhoz
A változáscsatorna-feldolgozóban a munka automatikusan több felhasználó között oszlik meg. A változáscsatorna lekérési modelljében a FeedRange
változáscsatorna feldolgozásának párhuzamossá gombra is használhatja. Az A FeedRange
a partíciókulcs értékeinek tartományát jelöli.
Íme egy példa, amely bemutatja, hogyan szerezheti be a tároló tartományainak listáját:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Amikor lekéri a tároló értékeinek listájátFeedRange
, fizikai partíciónként egyet FeedRange
kap.
A módosítási FeedRange
hírcsatorna több gépen vagy szálon történő feldolgozását párhuzamossá teheti FeedIterator
. Az előző példától eltérően, amely azt mutatta be, hogyan szerezhet be egy FeedIterator
teljes tárolót vagy egy partíciókulcsot, a FeedRanges használatával több FeedIteratort is beszerezhet, amelyek párhuzamosan feldolgozhatják a változáscsatornát.
Abban az esetben, ha a FeedRanges-t szeretné használni, rendelkeznie kell egy vezénylési eljárással, amely lekéri a FeedRanges-eket, és elosztja őket az adott gépeken. Ez a disztribúció a következő lehet:
- A sztringérték használata
FeedRange.ToJsonString
és terjesztése. A felhasználók ezt az értéket használhatják a következővelFeedRange.FromJsonString
: . - Ha az eloszlás folyamatban van, adja át az
FeedRange
objektumhivatkozást.
Íme egy példa, amely bemutatja, hogyan olvasható be a tároló változáscsatornájának elejétől két, párhuzamosan olvasható, hipotetikus különálló gép:
1. gép:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
2. gép:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Folytatási jogkivonatok mentése
A folytatási jogkivonat beszerzésével mentheti a FeedIterator
pozícióját. A folytatási jogkivonat egy sztringérték, amely nyomon követi a FeedIterator utolsó feldolgozott módosításait, és lehetővé teszi a folytatást ezen a FeedIterator
ponton később. A folytatási jogkivonat, ha meg van adva, elsőbbséget élvez a kezdési időponttal szemben, és az első értékekből indul ki. A következő kód beolvassa a változáscsatornát a tároló létrehozása óta. Miután nem érhető el több módosítás, a rendszer megőrzi a folytatási jogkivonatot, hogy a változáscsatorna-felhasználás később újrainduljon.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
A legújabb verziómód használata esetén a FeedIterator
folytatási jogkivonat soha nem jár le, amíg az Azure Cosmos DB-tároló még létezik. Ha az összes verziót és törlési módot használja, a FeedIterator
folytatási jogkivonat akkor érvényes, ha a módosítások a folyamatos biztonsági mentések megőrzési időszakában történtek.