Megosztás a következőn keresztül:


Változáscsatorna az Apache Cassandra-hoz készült Azure Cosmos DB-ben

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Az Apache Cassandra-hoz készült Azure Cosmos DB változáscsatorna-támogatása a Cassandra lekérdezési nyelv (CQL) lekérdezési predikátumain keresztül érhető el. Ezekkel a predikátumfeltételekkel lekérdezheti a változáscsatorna API-t. Az alkalmazások a CQL-ben szükséges elsődleges kulccsal (más néven partíciókulcs) lekérhetik a táblák módosításait. Ezután további műveleteket is végrehajthat az eredmények alapján. A táblázat sorainak módosításait a rendszer a módosítási idő és a partíciókulcsonkénti rendezési sorrend szerint rögzíti.

Az alábbi példa bemutatja, hogyan szerezhet be változáscsatornát a Cassandra Keyspace-tábla API-jának összes sorára a .NET használatával. A COSMOS_CHANGEFEED_START_TIME() predikátumot a rendszer közvetlenül a CQL-ben használja a változáscsatorna elemeinek lekérdezéséhez egy megadott kezdési időpontból (ebben az esetben az aktuális dátumidőből). A teljes mintát itt töltheti le a C#-hoz és a Java-hoz.

Minden iterációban a lekérdezés az utolsó pontmódosítások olvasása után folytatódik lapozási állapottal. A kulcstérben folyamatosan látható a tábla új módosításainak folyamatos adatfolyama. Látni fogjuk a beszúrt vagy frissített sorok módosításait. A Cassandra API változáscsatornát használó törlési műveleteinek figyelése jelenleg nem támogatott.

Feljegyzés

Ha egy gyűjtemény elvetése után újból használatba ad egy jogkivonatot, majd ugyanazzal a névvel újrahasználja, az hibát eredményez. Javasoljuk, hogy új gyűjtemény létrehozásakor és a gyűjtemény nevének újbóli használatakor állítsa a pageState értéket null értékre.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Ha egyetlen sor módosításait elsődleges kulcs alapján szeretné lekérni, hozzáadhatja az elsődleges kulcsot a lekérdezéshez. Az alábbi példa bemutatja, hogyan követheti nyomon a "user_id = 1" sor módosításait

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Jelenlegi korlátozások

A változáscsatorna Cassandra API-val való használatakor a következő korlátozások érvényesek:

  • A beszúrások és frissítések jelenleg támogatottak. A törlési művelet még nem támogatott. Áthidaló megoldásként hozzáadhat egy helyreállítható jelölőt a törölt sorokhoz. Adjon hozzá például egy "törölt" nevű mezőt a sorba, és állítsa "igaz" értékre.
  • A legutóbbi frissítés megmarad, mint a NoSQL alapvető API-jában, és az entitás köztes frissítései nem érhetők el.

Hibakezelés

A Cassandra API-ban a változáscsatorna használatakor a következő hibakódok és üzenetek támogatottak:

  • HTTP-hibakód: 429 – Ha a változáscsatorna sebessége korlátozott, üres lapot ad vissza.

Következő lépések