Változáscsatorna az Apache Cassandra-hoz készült Azure Cosmos DB-ben
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Az Apache Cassandra-hoz készült Azure Cosmos DB változáscsatorna-támogatása a Cassandra lekérdezési nyelv (CQL) lekérdezési predikátumain keresztül érhető el. Ezekkel a predikátumfeltételekkel lekérdezheti a változáscsatorna API-t. Az alkalmazások a CQL-ben szükséges elsődleges kulccsal (más néven partíciókulcs) lekérhetik a táblák módosításait. Ezután további műveleteket is végrehajthat az eredmények alapján. A táblázat sorainak módosításait a rendszer a módosítási idő és a partíciókulcsonkénti rendezési sorrend szerint rögzíti.
Az alábbi példa bemutatja, hogyan szerezhet be változáscsatornát a Cassandra Keyspace-tábla API-jának összes sorára a .NET használatával. A COSMOS_CHANGEFEED_START_TIME() predikátumot a rendszer közvetlenül a CQL-ben használja a változáscsatorna elemeinek lekérdezéséhez egy megadott kezdési időpontból (ebben az esetben az aktuális dátumidőből). A teljes mintát itt töltheti le a C#-hoz és a Java-hoz.
Minden iterációban a lekérdezés az utolsó pontmódosítások olvasása után folytatódik lapozási állapottal. A kulcstérben folyamatosan látható a tábla új módosításainak folyamatos adatfolyama. Látni fogjuk a beszúrt vagy frissített sorok módosításait. A Cassandra API változáscsatornát használó törlési műveleteinek figyelése jelenleg nem támogatott.
Feljegyzés
Ha egy gyűjtemény elvetése után újból használatba ad egy jogkivonatot, majd ugyanazzal a névvel újrahasználja, az hibát eredményez. Javasoljuk, hogy új gyűjtemény létrehozásakor és a gyűjtemény nevének újbóli használatakor állítsa a pageState értéket null értékre.
Session cassandraSession = utils.getSession();
try {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);
String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
byte[] token=null;
System.out.println(query);
while(true)
{
SimpleStatement st=new SimpleStatement(query);
st.setFetchSize(100);
if(token!=null)
st.setPagingStateUnsafe(token);
ResultSet result=cassandraSession.execute(st) ;
token=result.getExecutionInfo().getPagingState().toBytes();
for(Row row:result)
{
System.out.println(row.getString("user_name"));
}
}
} finally {
utils.close();
LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
}
Ha egyetlen sor módosításait elsődleges kulcs alapján szeretné lekérni, hozzáadhatja az elsődleges kulcsot a lekérdezéshez. Az alábbi példa bemutatja, hogyan követheti nyomon a "user_id = 1" sor módosításait
String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
SimpleStatement st=new SimpleStatement(query);
Jelenlegi korlátozások
A változáscsatorna Cassandra API-val való használatakor a következő korlátozások érvényesek:
- A beszúrások és frissítések jelenleg támogatottak. A törlési művelet még nem támogatott. Áthidaló megoldásként hozzáadhat egy helyreállítható jelölőt a törölt sorokhoz. Adjon hozzá például egy "törölt" nevű mezőt a sorba, és állítsa "igaz" értékre.
- A legutóbbi frissítés megmarad, mint a NoSQL alapvető API-jában, és az entitás köztes frissítései nem érhetők el.
Hibakezelés
A Cassandra API-ban a változáscsatorna használatakor a következő hibakódok és üzenetek támogatottak:
- HTTP-hibakód: 429 – Ha a változáscsatorna sebessége korlátozott, üres lapot ad vissza.