Megosztás a következőn keresztül:


Változáscsatorna feldolgozása az Azure Blob Storage-ban

A változáscsatorna tranzakciónaplókat biztosít a tárfiók blobjainak és blob metaadatainak változásairól. Ez a cikk bemutatja, hogyan olvashatja el a változáscsatorna rekordjait a blobmódosítási hírcsatorna processzortárával.

A változáscsatornával kapcsolatos további információkért lásd : Változáscsatorna az Azure Blob Storage-ban.

A projekt beállítása

Ez a szakasz végigvezeti egy projekt előkészítésén a .NET blobok változáscsatorna-ügyfélkódtárával való együttműködésre.

Csomagok telepítése

A projektkönyvtárból telepítse az Azure Storage Blobs Change Feed ügyféloldali kódtárának csomagját a .NET-hez a dotnet add package paranccsal. Ebben a példában hozzáadjuk a --prerelease jelölőt a parancshoz a legújabb előzetes verzió telepítéséhez.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

A cikkben szereplő kód példák az Azure Blob Storage- és az Azure Identity-csomagokat is használják.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Irányelvek hozzáadása using

Adja hozzá a következő using irányelveket a kódfájlhoz:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Ügyfélobjektum létrehozása

Az alkalmazás Blob Storage-hoz való csatlakoztatásához hozza létre az osztály egy példányát BlobServiceClient . Az alábbi példa bemutatja, hogyan hozhat létre ügyfélobjektumot DefaultAzureCredential engedélyezés céljából. További információ: Hozzáférés engedélyezése és csatlakozás a Blob Storage-hoz. A változáscsatorna használatához az Azure RBAC beépített Storage Blob-adatolvasó vagy újabb szerepkörre van szüksége.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

Az ügyfélobjektum paraméterként lesz átadva a cikkben bemutatott módszerek némelyikének.

Rekordok olvasása a változáscsatornában

Feljegyzés

A változáscsatorna egy nem módosítható és írásvédett entitás a tárfiókban. Tetszőleges számú alkalmazás képes egyszerre és egymástól függetlenül olvasni és feldolgozni a változáscsatornát a saját kényelmük érdekében. A rekordok nem lesznek eltávolítva a változáscsatornából, amikor egy alkalmazás felolvassa őket. Az egyes felhasználók olvasási vagy iterációs állapota független, és csak az alkalmazás tartja karban.

Az alábbi példakód végigvezeti a változáscsatorna összes rekordját, hozzáadja őket egy listához, majd visszaadja a változáscsatorna-események listáját:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Az alábbi kódpéldában a változáscsatorna-események listájából nyomtat ki néhány értéket:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Rekordok olvasásának folytatása mentett helyről

Dönthet úgy, hogy menti az olvasási pozíciót a változáscsatornában, majd később folytathatja az iterálást a rekordok között. Az olvasási pozíciót a változáscsatorna kurzorának lekérésével mentheti. A kurzor egy sztring, és az alkalmazás bármilyen módon mentheti ezt a sztringet, ami az alkalmazás tervezésének, például egy fájlnak vagy adatbázisnak a szempontjából logikus.

Ez a példa végigvezeti a változáscsatorna összes rekordján, hozzáadja őket egy listához, és menti a kurzort. A rendszer visszaadja a listát és a kurzort a hívónak.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Rekordok streamfeldolgozása

Dönthet úgy, hogy feldolgozzák a változáscsatorna rekordjait, mivel azok elkötelezettek a változáscsatorna iránt. Lásd a specifikációkat. A változásesemények átlagosan 60 másodperces időtartamban jelennek meg a változáscsatornában. Javasoljuk, hogy a szavazási időköz megadásakor az adott időszakkal kapcsolatos új módosításokat kérdezze le.

Ez a példa rendszeresen lekérdezi a módosításokat. Ha változásrekordok léteznek, ez a kód feldolgozza ezeket a rekordokat, és menti a változáscsatorna kurzorát. Így ha a folyamat leáll, majd újraindul, az alkalmazás a kurzor használatával folytathatja a rekordfeldolgozást, ahol utoljára abbahagyta. Ez a példa bemutató céljából menti a kurzort egy helyi fájlba, de az alkalmazás bármilyen formában mentheti, ami a legérthetőbb az Ön forgatókönyvéhez.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Rekordok olvasása egy adott időtartományon belül

Egy adott időtartományba eső rekordokat olvashat. Ez a példa végigvezeti a változáscsatorna minden rekordján, amely egy adott dátum- és időtartományba esik, hozzáadja őket egy listához, és visszaadja a listát:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

A megadott kezdési időpont le lesz kerekítve a legközelebbi órára, a befejezési időpont pedig felfelé kerekítve a legközelebbi órára. Előfordulhat, hogy a felhasználók a kezdési időpont előtt és a befejezés után is láthatják az eseményeket. Előfordulhat az is, hogy bizonyos események, amelyek a kezdési és a befejezési időpont között történnek, nem jelennek meg. Ennek az az oka, hogy az események a kezdési időpontot megelőző vagy a befejezés utáni órában rögzíthetők.

Következő lépések