DLT ile veri yükleme
DLT kullanarak Azure Databricks üzerinde Apache Spark tarafından desteklenen herhangi bir veri kaynağından veri yükleyebilirsiniz. Spark DataFrame döndüren herhangi bir sorguya karşılık DLT'de veri kümeleri (tablolar ve görünümler) tanımlayabilirsiniz; buna akış DataFrame'leri ve Pandas için Spark DataFrame'leri de dahildir. Databricks, veri alımı görevleri için çoğu kullanım örneğinde akış tablolarının kullanılmasını önerir. Akış tabloları, Auto Loader kullanarak bulut nesne depolama alanından veya Kafka gibi mesaj ağlarından veri almak için idealdir. Aşağıdaki örneklerde bazı yaygın desenler gösterilmiştir.
Önemli
Tüm veri kaynaklarının SQL desteği yoktur. Sql ve Python not defterlerini bir DLT işlem hattında birleştirerek sql'i alımın ötesindeki tüm işlemler için kullanabilirsiniz.
Varsayılan olarak DLT'de paketlenmemiş kitaplıklarla çalışma hakkında ayrıntılı bilgi için bkz. DLT işlem hatları için Python bağımlılıklarını yönetme.
Dosyaları bulut nesne depolama alanından yükleme
Databricks, bulut nesne depolamasından veri alımı görevlerinin çoğu için DLT ile Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici ve DLT, bulut depolama alanına geldikçe sürekli büyüyen verileri artımlı ve eşsiz bir şekilde yükleyecek şekilde tasarlanmıştır. Aşağıdaki örneklerde CSV ve JSON dosyalarından veri kümeleri oluşturmak için Otomatik Yükleyici kullanılır:
Not
Unity Kataloğu etkin bir işlem hattında Otomatik Yükleyici ile dosya yüklemek için dış konumları kullanmanız gerekir. Unity Kataloğu'nu DLT ile kullanma hakkında daha fazla bilgi edinmek için bkz. Unity Kataloğu'nu DLT işlem hatlarınızla kullanma.
Piton
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Bakınız: Otomatik Yükleyici nedir? ve Otomatik Yükleyici SQL söz dizimi.
Uyarı
Dosya bildirimleriyle Otomatik Yükleyici'yi kullanır ve işlem hattınız veya akış tablonuz için tam yenileme çalıştırırsanız kaynaklarınızı el ile temizlemeniz gerekir. Temizleme gerçekleştirmek için not defterindeki CloudFilesResourceManager kullanabilirsiniz.
İleti veri yolu'ndan veri yükleme
DLT işlem hatlarını, akış tablolarıyla ileti veri kümelerinden veri almak için yapılandırabilirsiniz. Databricks, mesaj iletim hatlarından düşük gecikmeli yükleme için en verimli alımı sağlamak amacıyla akış tablolarını gelişmiş otomatik ölçeklendirme ve sürekli yürütme ile birleştirmenizi önerir. Bkz. Gelişmiş otomatik ölçeklendirmeile DLT işlem hatlarının küme kullanımını iyileştirme.
Örneğin, aşağıdaki kod Kafka'dan veri almak için bir akış tablosu yapılandırır:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Aşağıdaki örnekte olduğu gibi, bu veriler üzerinde akış dönüştürmeleri gerçekleştirmek için saf SQL'de aşağı akış işlemleri yazabilirsiniz:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Event Hubs ile çalışma örneği için bkz. Azure Event Hubs'ı DLT veri kaynağı olarak kullanma.
Akış veri kaynaklarını yapılandırma için bkz. .
Dış sistemlerden veri yükleme
DLT, Azure Databricks tarafından desteklenen herhangi bir veri kaynağından veri yüklemeyi destekler. bkz. veri kaynaklarına bağlanma. Dış verileri, veri kaynaklarınıntarafından desteklendiği durumlarda Lakehouse Federasyonu kullanarak da yükleyebilirsiniz. Lakehouse Federation, Databricks Runtime 13.3 LTS veya üzerini gerektirdiğinden, Lakehouse Federation'ı kullanmak için işlem hattınızın önizleme kanalını kullanacak şekilde yapılandırılması gerekir.
Bazı veri kaynaklarının SQL'de eşdeğer desteği yoktur. Lakehouse Federation'u bu veri kaynaklarından biriyle kullanamıyorsanız, kaynaktan veri almak için Python not defteri kullanabilirsiniz. Python ve SQL kaynak kodunu aynı DLT işlem hattına ekleyebilirsiniz. Aşağıdaki örnek, uzak bir PostgreSQL tablosundaki verilerin geçerli durumuna erişmek için gerçekleştirilmiş bir görünüm bildirir:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Bulut nesne depolama alanından küçük veya statik veri kümeleri yükleme
Apache Spark yük söz dizimini kullanarak küçük veya statik veri kümelerini yükleyebilirsiniz. DLT, Azure Databricks üzerinde Apache Spark tarafından desteklenen tüm dosya biçimlerini destekler. Tam liste için bkz. Veri biçimi seçenekleri.
Aşağıdaki örneklerde DLT tabloları oluşturmak için JSON yükleme gösterilmektedir:
Piton
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Not
SELECT * FROM format.`path`;
SQL yapısı, Azure Databricks'te tüm SQL ortamları için ortaktır. DLT ile SQL kullanarak doğrudan dosya erişimi için önerilen desendir.
İşlem hattındaki gizli dizilerle depolama kimlik bilgilerine güvenli bir şekilde erişme
Erişim anahtarları veya parolalar gibi kimlik bilgilerini depolamak için Azure Databricks gizli dizileri kullanabilirsiniz. Gizliyi işlem hattınıza yapılandırmak için işlem hattı ayarları küme yapılandırmasında bir Spark özelliği kullanın. bkz. DLT işlem hattı için işlem yapılandırma.
Aşağıdaki örnek, Auto Loaderkullanarak Azure Data Lake Storage Gen2 (ADLS Gen2) depolama hesabından giriş verilerini okumak için gereken erişim anahtarını depolamak amacıyla bir gizli bilgi kullanır. İşlem hattınızın gerektirdiği gizli dizileri (örneğin, S3'e erişmek için AWS anahtarları veya apache Hive meta veri deposu parolası) yapılandırmak için aynı yöntemi kullanabilirsiniz.
Azure Data Lake Storage Nesil 2 ile çalışma hakkında daha fazla bilgi edinmek için bkz.: Azure Data Lake Storage Nesil 2 ve Blob Depolamaile bağlanma.
Not
gizli dizi değerini ayarlayan spark_conf
yapılandırma anahtarına spark.hadoop.
ön ekini eklemeniz gerekir.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Değiştirmek
- ADLS 2. Nesil depolama hesabı adıyla
<storage-account-name>
. - Azure Databricks gizli kapsam adı olan
<scope-name>
. - Azure depolama hesabı erişim anahtarını içeren anahtarın adı olarak
<secret-name>
.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Değiştirmek
-
<container-name>
giriş verilerini depolayan Azure depolama hesabı kapsayıcısının adıyla. - ADLS 2. Nesil depolama hesabı adıyla
<storage-account-name>
. - Giriş veri kümesine giden yol
<path-to-input-dataset>
ile.
Azure Event Hubs'dan veri yükleme
Azure Event Hubs, Apache Kafka uyumlu bir arabirim sağlayan bir veri akışı hizmetidir. Azure Event Hubs'dan ileti yüklemek için DLT çalışma zamanına dahil edilen Yapılandırılmış Akış Kafka bağlayıcısını kullanabilirsiniz. Azure Event Hubs'dan iletileri yükleme ve işleme hakkında daha fazla bilgi edinmek için bkz. Azure Event Hubs'ı DLT veri kaynağı olarak kullanma.