Változáscsatorna feldolgozása az Azure Blob Storage-ban
A változáscsatorna tranzakciónaplókat biztosít a tárfiók blobjainak és blob metaadatainak változásairól. Ez a cikk bemutatja, hogyan olvashatja el a változáscsatorna rekordjait a blobmódosítási hírcsatorna processzortárával.
A változáscsatornával kapcsolatos további információkért lásd : Változáscsatorna az Azure Blob Storage-ban.
A projekt beállítása
Ez a szakasz végigvezeti egy projekt előkészítésén a .NET blobok változáscsatorna-ügyfélkódtárával való együttműködésre.
Csomagok telepítése
A projektkönyvtárból telepítse az Azure Storage Blobs Change Feed ügyféloldali kódtárának csomagját a .NET-hez a dotnet add package
paranccsal. Ebben a példában hozzáadjuk a --prerelease
jelölőt a parancshoz a legújabb előzetes verzió telepítéséhez.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
A cikkben szereplő kód példák az Azure Blob Storage- és az Azure Identity-csomagokat is használják.
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Irányelvek hozzáadása using
Adja hozzá a következő using
irányelveket a kódfájlhoz:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Ügyfélobjektum létrehozása
Az alkalmazás Blob Storage-hoz való csatlakoztatásához hozza létre az osztály egy példányát BlobServiceClient
. Az alábbi példa bemutatja, hogyan hozhat létre ügyfélobjektumot DefaultAzureCredential
engedélyezés céljából. További információ: Hozzáférés engedélyezése és csatlakozás a Blob Storage-hoz. A változáscsatorna használatához az Azure RBAC beépített Storage Blob-adatolvasó vagy újabb szerepkörre van szüksége.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Az ügyfélobjektum paraméterként lesz átadva a cikkben bemutatott módszerek némelyikének.
Rekordok olvasása a változáscsatornában
Feljegyzés
A változáscsatorna egy nem módosítható és írásvédett entitás a tárfiókban. Tetszőleges számú alkalmazás képes egyszerre és egymástól függetlenül olvasni és feldolgozni a változáscsatornát a saját kényelmük érdekében. A rekordok nem lesznek eltávolítva a változáscsatornából, amikor egy alkalmazás felolvassa őket. Az egyes felhasználók olvasási vagy iterációs állapota független, és csak az alkalmazás tartja karban.
Az alábbi példakód végigvezeti a változáscsatorna összes rekordját, hozzáadja őket egy listához, majd visszaadja a változáscsatorna-események listáját:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Az alábbi kódpéldában a változáscsatorna-események listájából nyomtat ki néhány értéket:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Rekordok olvasásának folytatása mentett helyről
Dönthet úgy, hogy menti az olvasási pozíciót a változáscsatornában, majd később folytathatja az iterálást a rekordok között. Az olvasási pozíciót a változáscsatorna kurzorának lekérésével mentheti. A kurzor egy sztring, és az alkalmazás bármilyen módon mentheti ezt a sztringet, ami az alkalmazás tervezésének, például egy fájlnak vagy adatbázisnak a szempontjából logikus.
Ez a példa végigvezeti a változáscsatorna összes rekordján, hozzáadja őket egy listához, és menti a kurzort. A rendszer visszaadja a listát és a kurzort a hívónak.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Rekordok streamfeldolgozása
Dönthet úgy, hogy feldolgozzák a változáscsatorna rekordjait, mivel azok elkötelezettek a változáscsatorna iránt. Lásd a specifikációkat. A változásesemények átlagosan 60 másodperces időtartamban jelennek meg a változáscsatornában. Javasoljuk, hogy a szavazási időköz megadásakor az adott időszakkal kapcsolatos új módosításokat kérdezze le.
Ez a példa rendszeresen lekérdezi a módosításokat. Ha változásrekordok léteznek, ez a kód feldolgozza ezeket a rekordokat, és menti a változáscsatorna kurzorát. Így ha a folyamat leáll, majd újraindul, az alkalmazás a kurzor használatával folytathatja a rekordfeldolgozást, ahol utoljára abbahagyta. Ez a példa bemutató céljából menti a kurzort egy helyi fájlba, de az alkalmazás bármilyen formában mentheti, ami a legérthetőbb az Ön forgatókönyvéhez.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Rekordok olvasása egy adott időtartományon belül
Egy adott időtartományba eső rekordokat olvashat. Ez a példa végigvezeti a változáscsatorna minden rekordján, amely egy adott dátum- és időtartományba esik, hozzáadja őket egy listához, és visszaadja a listát:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
A megadott kezdési időpont le lesz kerekítve a legközelebbi órára, a befejezési időpont pedig felfelé kerekítve a legközelebbi órára. Előfordulhat, hogy a felhasználók a kezdési időpont előtt és a befejezés után is láthatják az eseményeket. Előfordulhat az is, hogy bizonyos események, amelyek a kezdési és a befejezési időpont között történnek, nem jelennek meg. Ennek az az oka, hogy az események a kezdési időpontot megelőző vagy a befejezés utáni órában rögzíthetők.