Google Pub/Sub'a abone olma
Azure Databricks, Databricks Runtime 13.3 LTS ve üzerinde Google Pub/Sub'a abone olmak için yerleşik bir bağlayıcı sağlar. Bu bağlayıcı, aboneden gelen kayıtlar için tam olarak bir kez işleme semantiği sağlar.
Not
Pub/Sub yinelenen kayıtlar yayımlayabilir ve kayıtlar aboneye sırasız gelebilir. Yinelenen ve sıra dışı kayıtları işlemek için Azure Databricks kodu yazmanız gerekir.
Söz dizimi örneği
Aşağıdaki kod örneği, Pub/Sub'dan okunan yapılandırılmış akış yapılandırmaya yönelik temel söz dizimini gösterir:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Daha fazla yapılandırma seçeneği için bkz . Pub/Sub akış okuma seçeneklerini yapılandırma.
Pub/Sub erişimi yapılandırma
Aşağıdaki tabloda, yapılandırılan kimlik bilgileri için gereken roller açıklanmaktadır:
Roller | Gerekli veya isteğe bağlı | Nasıl kullanılır? |
---|---|---|
roles/pubsub.viewer veya roles/viewer |
Zorunlu | Aboneliğin mevcut olup olmadığını denetleyin ve aboneliği alın |
roles/pubsub.subscriber |
Zorunlu | Abonelikten veri getirme |
roles/pubsub.editor veya roles/editor |
İsteğe bağlı | Yoksa aboneliğin oluşturulmasını ve akış sonlandırmada abonelikleri silmek için özelliğinin deleteSubscriptionOnStreamStop kullanılmasını etkinleştirir |
Databricks, yetkilendirme seçenekleri sağlarken gizli dizilerin kullanılmasını önerir. Bağlantıyı yetkilendirmek için aşağıdaki seçenekler gereklidir:
clientEmail
clientId
privateKey
privateKeyId
Pub/Sub şeması
Akışın şeması, aşağıdaki tabloda açıklandığı gibi Pub/Sub dosyasından getirilen kayıtlara eşleşiyor:
Alan | Tür |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Pub/Sub akış okuma seçeneklerini yapılandırma
Aşağıdaki tabloda Pub/Sub için desteklenen seçenekler açıklanmaktadır. Tüm seçenekler, söz dizimi kullanılarak .option("<optionName>", "<optionValue>")
Yapılandırılmış Akış okumasının bir parçası olarak yapılandırılır.
Not
Bazı Pub/Sub yapılandırma seçenekleri, mikro toplu işlemler yerine getirme kavramını kullanır. Bu, iç uygulama ayrıntılarını yansıtır ve seçenekler, kayıtların getirilip işlenmeleri dışında diğer Yapılandırılmış Akış bağlayıcılarındaki corollaries'lere benzer şekilde çalışır.
Seçenek | Varsayılan değer | Açıklama |
---|---|---|
numFetchPartitions |
Akış başlatma sırasında mevcut yürütücü sayısının yarısına ayarlayın. | Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı. |
deleteSubscriptionOnStreamStop |
false |
ise true , akış işi sona erdiğinde akışa geçirilen abonelik silinir. |
maxBytesPerTrigger |
yok | Tetiklenen her mikro toplu işlem sırasında işlenecek toplu iş boyutu için geçici sınır. |
maxRecordsPerFetch |
1000 | Kayıtları işlemeden önce görev başına getirilmeye çalışacak kayıt sayısı. |
maxFetchPeriod |
10 saniye | Kayıtları işlemeden önce her görevin getirilebilmesi için gereken süre. Databricks varsayılan değerin kullanılmasını önerir. |
Pub/Sub için artımlı toplu işleme semantiği
Pub/Sub kaynaklarından kullanılabilir kayıtları artımlı bir toplu iş olarak kullanmak için kullanabilirsiniz Trigger.AvailableNow
.
Azure Databricks, ayarıyla bir okumaya başladığınızda zaman damgasını Trigger.AvailableNow
kaydeder. Toplu işlem tarafından işlenen kayıtlar, daha önce getirilen tüm verileri ve kaydedilen akış başlangıç zaman damgasından daha az zaman damgasına sahip yeni yayımlanan kayıtları içerir.
Bkz . Artımlı toplu işlemeyi yapılandırma.
Akış ölçümlerini izleme
Yapılandırılmış Akış ilerlemesi ölçümleri getirilen ve işlemeye hazır kayıt sayısını, getirilen ve işlemeye hazır kayıtların boyutunu ve akış başlangıcından bu yana görülen yinelenenlerin sayısını bildirir. Aşağıda bu ölçümlere bir örnek verilmiştir:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Sınırlamalar
Pub/Sub ile tahmini yürütme (spark.speculation
) desteklenmez.