Sdílet prostřednictvím


Přihlášení k odběru Google Pub/Sub

Azure Databricks poskytuje integrovaný konektor pro přihlášení k odběru Google Pub/Sub v Databricks Runtime 13.3 LTS a novějších. Tento konektor poskytuje sémantiku zpracování přesně jednou pro záznamy od odběratele.

Poznámka:

Pub/Sub může publikovat duplicitní záznamy a záznamy můžou přijít odběrateli z objednávky. Měli byste napsat kód Azure Databricks pro zpracování duplicitních a zastaralých záznamů.

Příklad syntaxe

Následující příklad kódu ukazuje základní syntaxi pro konfiguraci strukturovaného streamování přečtené z pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Další možnosti konfigurace najdete v tématu Konfigurace možností pro čtení pub/dílčího streamování.

Konfigurace přístupu k pub/Sub

Následující tabulka popisuje role požadované pro nakonfigurované přihlašovací údaje:

Role Požadované nebo volitelné Jak se používá
roles/pubsub.viewer nebo roles/viewer Požaduje se Kontrola, jestli předplatné existuje, a získání předplatného
roles/pubsub.subscriber Požaduje se Načtení dat z předplatného
roles/pubsub.editor nebo roles/editor Volitelné Umožňuje vytvoření předplatného v případě, že neexistuje, a také umožňuje odstranit předplatná při ukončení streamu deleteSubscriptionOnStreamStop .

Databricks doporučuje používat tajné kódy při poskytování možností autorizace. K autorizaci připojení se vyžadují následující možnosti:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Sub schéma

Schéma datového proudu odpovídá záznamům načteným z pub/Sub, jak je popsáno v následující tabulce:

Pole Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurace možností čtení pub/sub streamingu

Následující tabulka popisuje možnosti podporované pro Pub/Sub. Všechny možnosti jsou nakonfigurované jako součást syntaxe čtení .option("<optionName>", "<optionValue>") strukturovaného streamování.

Poznámka:

Některé možnosti konfigurace Pub/Sub používají místo mikrodávek koncept načítání. To odráží podrobnosti interní implementace a možnosti fungují podobně jako spolurolly v jiných konektorech strukturovaného streamování s tím rozdílem, že se načítají a pak zpracovávají záznamy.

Možnost Výchozí hodnota Popis
numFetchPartitions Nastavte na polovinu počtu výkonných procesů, které jsou přítomny při inicializaci datového proudu. Početparalelních
deleteSubscriptionOnStreamStop false Pokud truese předplatné předané do datového proudu odstraní, jakmile úloha streamování skončí.
maxBytesPerTrigger Žádná Měkký limit velikosti dávky, která se má zpracovat během každé aktivované mikrodávkové dávky.
maxRecordsPerFetch 1000 Počet záznamů, které se mají načíst na každou úlohu před zpracováním záznamů.
maxFetchPeriod 10 sekund Doba trvání každého úkolu, která se má načíst před zpracováním záznamů. Databricks doporučuje použít výchozí hodnotu.

Sémantika přírůstkového dávkového zpracování pro pub/sub

K využívání dostupných záznamů ze zdrojů Pub/Sub můžete použít Trigger.AvailableNow přírůstkovou dávku.

Azure Databricks zaznamenává časové razítko při zahájení čtení s Trigger.AvailableNow nastavením. Záznamy zpracovávané dávkou zahrnují všechna dříve načtená data a všechny nově publikované záznamy s časovým razítkem kratším než časové razítko spuštění zaznamenaného datového proudu.

Viz Konfigurace přírůstkového dávkového zpracování.

Monitorování metrik streamování

Metriky průběhu strukturovaného streamování hlásí počet načtených a připravených záznamů ke zpracování, velikost načtených a připravených záznamů ke zpracování a počet duplicit zobrazených od spuštění datového proudu. Následuje příklad těchto metrik:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Omezení

Spekulativní spuštění (spark.speculation) není podporováno u pub/Sub.