Přihlášení k odběru Google Pub/Sub
Azure Databricks poskytuje integrovaný konektor pro přihlášení k odběru Google Pub/Sub v Databricks Runtime 13.3 LTS a novějších. Tento konektor poskytuje sémantiku zpracování přesně jednou pro záznamy od odběratele.
Poznámka:
Pub/Sub může publikovat duplicitní záznamy a záznamy můžou přijít odběrateli z objednávky. Měli byste napsat kód Azure Databricks pro zpracování duplicitních a zastaralých záznamů.
Příklad syntaxe
Následující příklad kódu ukazuje základní syntaxi pro konfiguraci strukturovaného streamování přečtené z pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Další možnosti konfigurace najdete v tématu Konfigurace možností pro čtení pub/dílčího streamování.
Konfigurace přístupu k pub/Sub
Následující tabulka popisuje role požadované pro nakonfigurované přihlašovací údaje:
Role | Požadované nebo volitelné | Jak se používá |
---|---|---|
roles/pubsub.viewer nebo roles/viewer |
Požaduje se | Kontrola, jestli předplatné existuje, a získání předplatného |
roles/pubsub.subscriber |
Požaduje se | Načtení dat z předplatného |
roles/pubsub.editor nebo roles/editor |
Volitelné | Umožňuje vytvoření předplatného v případě, že neexistuje, a také umožňuje odstranit předplatná při ukončení streamu deleteSubscriptionOnStreamStop . |
Databricks doporučuje používat tajné kódy při poskytování možností autorizace. K autorizaci připojení se vyžadují následující možnosti:
clientEmail
clientId
privateKey
privateKeyId
Pub/Sub schéma
Schéma datového proudu odpovídá záznamům načteným z pub/Sub, jak je popsáno v následující tabulce:
Pole | Typ |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurace možností čtení pub/sub streamingu
Následující tabulka popisuje možnosti podporované pro Pub/Sub. Všechny možnosti jsou nakonfigurované jako součást syntaxe čtení .option("<optionName>", "<optionValue>")
strukturovaného streamování.
Poznámka:
Některé možnosti konfigurace Pub/Sub používají místo mikrodávek koncept načítání. To odráží podrobnosti interní implementace a možnosti fungují podobně jako spolurolly v jiných konektorech strukturovaného streamování s tím rozdílem, že se načítají a pak zpracovávají záznamy.
Možnost | Výchozí hodnota | Popis |
---|---|---|
numFetchPartitions |
Nastavte na polovinu počtu výkonných procesů, které jsou přítomny při inicializaci datového proudu. | Početparalelních |
deleteSubscriptionOnStreamStop |
false |
Pokud true se předplatné předané do datového proudu odstraní, jakmile úloha streamování skončí. |
maxBytesPerTrigger |
Žádná | Měkký limit velikosti dávky, která se má zpracovat během každé aktivované mikrodávkové dávky. |
maxRecordsPerFetch |
1000 | Počet záznamů, které se mají načíst na každou úlohu před zpracováním záznamů. |
maxFetchPeriod |
10 sekund | Doba trvání každého úkolu, která se má načíst před zpracováním záznamů. Databricks doporučuje použít výchozí hodnotu. |
Sémantika přírůstkového dávkového zpracování pro pub/sub
K využívání dostupných záznamů ze zdrojů Pub/Sub můžete použít Trigger.AvailableNow
přírůstkovou dávku.
Azure Databricks zaznamenává časové razítko při zahájení čtení s Trigger.AvailableNow
nastavením. Záznamy zpracovávané dávkou zahrnují všechna dříve načtená data a všechny nově publikované záznamy s časovým razítkem kratším než časové razítko spuštění zaznamenaného datového proudu.
Viz Konfigurace přírůstkového dávkového zpracování.
Monitorování metrik streamování
Metriky průběhu strukturovaného streamování hlásí počet načtených a připravených záznamů ke zpracování, velikost načtených a připravených záznamů ke zpracování a počet duplicit zobrazených od spuštění datového proudu. Následuje příklad těchto metrik:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Omezení
Spekulativní spuštění (spark.speculation
) není podporováno u pub/Sub.