Feliratkozás a Google Pub/Sub szolgáltatásra
Az Azure Databricks egy beépített összekötőt biztosít a Google Pub/Sub előfizetéséhez a Databricks Runtime 13.3 LTS-ben és újabb verziókban. Ez az összekötő pontosan egyszeri feldolgozási szemantikát biztosít az előfizető rekordjaihoz.
Feljegyzés
A Pub/Sub ismétlődő rekordokat tehet közzé, és előfordulhat, hogy a rekordok sorrenden kívül érkeznek az előfizetőhöz. Meg kell írnia az Azure Databricks-kódot a duplikált és a rendelésen kívüli rekordok kezeléséhez.
Példa szintaxisra
Az alábbi kódpéldából megismerheti a Strukturált streamelési olvasás konfigurálásának alapszintaxisát a Pub/Sub szolgáltatásból:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
További konfigurációs beállításokért tekintse meg a Pub/Sub streaming olvasási beállításainak konfigurálását.
Pubhoz/alhoz való hozzáférés konfigurálása
Az alábbi táblázat a konfigurált hitelesítő adatokhoz szükséges szerepköröket ismerteti:
Szerepkörök | Kötelező vagy választható | A használat menete |
---|---|---|
roles/pubsub.viewer vagy roles/viewer |
Kötelező | Ellenőrizze, hogy létezik-e előfizetés, és szerezze be az előfizetést |
roles/pubsub.subscriber |
Kötelező | Adatok lekérése előfizetésből |
roles/pubsub.editor vagy roles/editor |
Választható | Engedélyezi az előfizetés létrehozását, ha az nem létezik, és lehetővé teszi az deleteSubscriptionOnStreamStop előfizetések törlését a stream leállításakor |
A Databricks a titkos kódok használatát javasolja az engedélyezési lehetőségek megadásakor. A kapcsolat engedélyezéséhez a következő lehetőségek szükségesek:
clientEmail
clientId
privateKey
privateKeyId
Pub/Alséma
A stream sémája megegyezik a Pub/Sub szolgáltatásból lekért rekordokkal, az alábbi táblázatban leírtak szerint:
Mező | Típus |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
A Pub/Sub streaming olvasási beállításainak konfigurálása
Az alábbi táblázat a Pub/Sub esetében támogatott beállításokat ismerteti. Minden beállítás konfigurálva van egy strukturált streamelési olvasás részeként szintaxis használatával .option("<optionName>", "<optionValue>")
.
Feljegyzés
Egyes pub-/alkonfigurációs beállítások a lekérések fogalmát használják a mikrokötegek helyett. Ez a belső megvalósítás részleteit tükrözi, és a beállítások a többi strukturált stream-összekötőben lévő társrollokhoz hasonlóan működnek, kivéve, hogy a rekordok lekérése és feldolgozása történik.
Lehetőség | Alapértelmezett érték | Leírás |
---|---|---|
numFetchPartitions |
Állítsa be a stream inicializálása során jelen lévő végrehajtók számának felét. | Az előfizetésből rekordokat lekérő párhuzamos Spark-feladatok száma. |
deleteSubscriptionOnStreamStop |
false |
Ha true a streamnek átadott előfizetés a streamelési feladat befejeződésekor törlődik. |
maxBytesPerTrigger |
Nincs | Az egyes aktivált mikrokötegek során feldolgozandó köteg méretének korlátja. |
maxRecordsPerFetch |
1000 | A rekordok feldolgozása előtt lekérendő rekordok száma tevékenységenként. |
maxFetchPeriod |
10 másodperc | Az egyes tevékenységek beolvasásának időtartama a rekordok feldolgozása előtt. A Databricks az alapértelmezett érték használatát javasolja. |
Növekményes kötegfeldolgozás szemantikája Pub/Sub esetén
Egy növekményes köteget használhat Trigger.AvailableNow
a Pub/Alforrások elérhető rekordjainak felhasználásához.
Az Azure Databricks rögzíti az időbélyeget, amikor elkezd olvasni a Trigger.AvailableNow
beállítással. A köteg által feldolgozott rekordok tartalmazzák az összes korábban lekért adatot és az újonnan közzétett rekordokat, és a rögzített stream kezdési időbélyegénél kisebb időbélyeggel.
Lásd a növekményes kötegfeldolgozás konfigurálását.
Streamelési metrikák monitorozása
A strukturált streamelési folyamat mérőszámai a lehívott és a feldolgozásra kész rekordok számát, a lekért és a feldolgozásra kész rekordok méretét, valamint a stream kezdete óta látott ismétlődések számát jelentik. Az alábbi példák a következő metrikákra mutatnak be példát:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Korlátozások
A spekulatív végrehajtás (spark.speculation
) nem támogatott a Pub/Sub esetében.