Aracılığıyla paylaş


Akış verilerini sorgulama

Yapılandırılmış Akış kullanarak akış veri kaynaklarını sorgulamak için Azure Databricks'i kullanabilirsiniz. Azure Databricks, Python ve Scala'da akış iş yükleri için kapsamlı destek sağlar ve SQL ile Çoğu Yapılandırılmış Akış işlevini destekler.

Aşağıdaki örneklerde, not defterlerinde etkileşimli geliştirme sırasında akış verilerinin el ile incelenmesi için bellek havuzu kullanımı gösterilmektedir. Not defteri kullanıcı arabirimindeki satır çıktı sınırları nedeniyle, akış sorguları tarafından okunan tüm verileri gözlemlemeyebilirsiniz. Üretim iş yüklerinde, akış sorgularını yalnızca bir hedef tabloya veya dış sisteme yazarak tetiklemeniz gerekir.

Not

Akış verileriyle ilgili etkileşimli sorgular için SQL desteği, tüm amaçlı işlemlerde çalışan not defterleriyle sınırlıdır. Databricks SQL veya DLT'de akış tabloları bildirirken SQL de kullanabilirsiniz. Bkz. Databricks SQL'de akış tablolarını kullanarak veri yükleme ve DLT nedir?.

Akış sistemlerinden verileri sorgulama

Azure Databricks, aşağıdaki akış sistemleri için akış veri okuyucuları sağlar:

  • Kafka
  • Kinezi
  • PubSub
  • Pulsar

Bu sistemlere yönelik sorguları başlatırken, seçtiğiniz ortama ve okumak istediğiniz sisteme bağlı olarak değişen yapılandırma ayrıntılarını sağlamanız gerekir. Bkz Akış veri kaynaklarını yapılandırma.

Akış sistemlerini içeren yaygın iş yükleri, lakehouse'a veri alma ve verileri dış sistemlere aktarmak için akış işlemesini içerir. Akış iş yükleri hakkında daha fazla bilgi için Azure Databricks'te Akış konusuna bakın.

Aşağıdaki örneklerde Kafka'dan okunan etkileşimli bir akış gösterilmektedir:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Tabloyu akış okuma yöntemiyle sorgulama

Azure Databricks varsayılan olarak Delta Lake kullanarak tüm tabloları oluşturur. Delta tablosunda akış sorgusu gerçekleştirdiğinizde, tablonun bir sürümü işlendiğinde sorgu yeni kayıtları otomatik olarak alır. Varsayılan olarak, akış sorguları kaynak tabloların yalnızca eklenen kayıtları içermesini bekler. Güncelleştirmeleri ve silmeleri içeren akış verileriyle çalışmanız gerekiyorsa, Databricks DLT ve APPLY CHANGES INTOkullanmanızı önerir. Bkz. DEĞIŞIKLIKLERI UYGULA API'leri: DLTile değişiklik verilerini yakalamayı basitleştirme.

Aşağıdaki örnekler, bir tablodan okunan etkileşimli bir akış gerçekleştirmeyi gösterir:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Otomatik Yükleyici ile bulut nesne depolamasında verileri sorgulama

Azure Databricks bulut veri bağlayıcısı olan Otomatik Yükleyici'yi kullanarak bulut nesne depolama alanından veri akışı yapabilirsiniz. Bağlayıcıyı Unity Kataloğu birimlerinde veya diğer bulut nesne depolama konumlarında depolanan dosyalarla kullanabilirsiniz. Databricks, bulut nesne depolamadaki verilere erişimi yönetmek için birimlerin kullanılmasını önerir. Bkz . Veri kaynaklarına bağlanma.

Azure Databricks, popüler biçimlendirilmiş, yarı biçimlendirilmiş ve biçimlendirilmemiş biçimlerde depolanan bulut nesne depolama verilerinin sürekli olarak alımı için bu bağlayıcıyı optimize eder. Databricks, bozuk kayıtlar veya şema değişiklikleri nedeniyle aktarım hızını en üst düzeye çıkarmak ve olası veri kaybını en aza indirmek için alınan verilerin neredeyse ham biçimde depolanmasını önerir.

Bulut nesne depolama alanından veri alma hakkında daha fazla öneri için, Azure Databricks lakehouse'a veri alma süreçlerini ve'de bulabilirsiniz.

Aşağıdaki örneklerde bir birimdeki JSON dosyalarının dizininden okunan etkileşimli bir akış gösterilmektedir:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')