Azure Databricks'te ilk ETL iş yükünüzü çalıştırma
Veri düzenleme için ilk ayıklama, dönüştürme ve yükleme (ETL) işlem hatlarınızı geliştirmek ve dağıtmak için Azure Databricks'in üretime hazır araçlarını kullanmayı öğrenin.
Bu makalenin sonunda kendinizi rahat hissedeceksiniz:
- Databricks çok amaçlı işlem kümesini başlatma.
- Bir Databricks defteri oluşturma.
- Otomatik Yükleyici ile Delta Lake'e artımlı veri alımını yapılandırma.
- Verileri işlemek, sorgulamak ve önizlemek için not defteri hücrelerini yürütme.
- Bir not defterini Databricks görevi olarak zamanlama.
Bu öğreticide Python veya Scala'daki yaygın ETL görevlerini tamamlamak için etkileşimli not defterleri kullanılır.
ETL işlem hatları oluşturmak için DLT de kullanabilirsiniz. Databricks, üretim ETL işlem hatlarını oluşturma, dağıtma ve bakımının karmaşıklığını azaltmak için DLT'yi oluşturmuştur. Bkz. Kılavuz: İlk DLT işlem hattınızı çalıştırma.
Bu makalenin kaynaklarını oluşturmak için Databricks Terraform sağlayıcısını da kullanabilirsiniz. Bkz. Terraform ile kümeler, not defterleri ve işler oluşturma.
Gereksinimler
- Azure Databricks çalışma alanında oturum açtınız.
- Küme oluşturma izniniz var.
Not
Küme denetimi ayrıcalıklarınız yoksa, bir kümeye erişiminiz olduğu sürece aşağıdaki adımların çoğunu yine de tamamlayabilirsiniz.
1. Adım: Küme oluşturma
Keşif veri analizi ve veri mühendisliği yapmak için, komutları yürütmek için gereken işlem kaynaklarını sağlamak üzere bir küme oluşturun.
- Kenar çubuğunda
tıklayın İşlem.
- İşlem sayfasında Küme Oluştur'a tıklayın. Bu, Yeni Küme sayfasını açar.
- Küme için benzersiz bir ad belirtin, kalan değerleri varsayılan durumunda bırakın ve Küme Oluşturöğesine tıklayın.
Databricks kümeleri hakkında daha fazla bilgi edinmek için İşlem sayfasına bakın.
2. Adım: Databricks not defteri oluşturma
Çalışma alanınızda not defteri oluşturmak için kenar çubuğunda Yeni'ye ve ardından Not Defteri'ne tıklayın. Çalışma alanında boş bir not defteri açılır.
Not defterlerini oluşturma ve yönetme hakkında daha fazla bilgi edinmek için bkz . Not defterlerini yönetme.
3. Adım: Verileri Delta Lake'e almak için Otomatik Yükleyici'yi yapılandırma
Databricks, artımlı veri alımı için Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler.
Databricks, Delta Lake ile veri depolamayı önerir. Delta Lake, ACID işlemleri sağlayan ve data lakehouse'a olanak tanıyan açık kaynak bir depolama katmanıdır. Delta Lake, Databricks'te oluşturulan tablolar için varsayılan biçimdir.
Verileri Delta Lake tablosuna almak üzere Otomatik Yükleyici'yi yapılandırmak için aşağıdaki kodu kopyalayıp not defterinizdeki boş hücreye yapıştırın:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Not
Bu kodda tanımlanan değişkenler, mevcut çalışma alanı varlıklarıyla veya diğer kullanıcılarla çakışma riski olmadan bunu güvenli bir şekilde yürütmenize olanak sağlamalıdır. Kısıtlı ağ veya depolama izinleri bu kodu yürütürken hatalara neden olur; bu kısıtlamaları gidermek için çalışma alanı yöneticinize başvurun.
Otomatik Yükleyici hakkında daha fazla bilgi edinmek için bkz . Otomatik Yükleyici nedir?.
4. Adım: Verileri işleme ve verilerle etkileşim kurma
Not defterleri mantığı hücre hücre yürütür. Hücrenizdeki mantığı yürütmek için:
Önceki adımda tamamladığınız hücreyi çalıştırmak için hücreyi seçin ve SHIFT+ENTER
basın. Yeni oluşturduğunuz tabloyu sorgulamak için, aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından hücreyi çalıştırmak için SHIFT+ENTER
basın. Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
DataFrame'inizdeki verilerin önizlemesini görüntülemek için aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.
Python
display(df)
Scala
display(df)
Verileri görselleştirmeye yönelik etkileşimli seçenekler hakkında daha fazla bilgi edinmek için bkz . Databricks not defterlerindeki görselleştirmeler.
5. Adım: İş zamanlama
Databricks not defterlerini bir Databricks işine görev olarak ekleyerek üretim betikleri olarak çalıştırabilirsiniz. Bu adımda, el ile tetikleyebileceğiniz yeni bir iş oluşturacaksınız.
Not defterinizi görev olarak zamanlamak için:
- Üst bilgi çubuğunun sağ tarafındaki Zamanla'ya tıklayın.
- İş adı için benzersiz bir ad girin.
- El ile'ye tıklayın.
- Kümesi açılır listesinden, 1. adımda oluşturduğunuz kümeyi seçin.
- Oluştur’a tıklayın.
- Görüntülenen pencerede, Şimdi çalıştıröğesine tıklayın.
- İş çalıştırma sonuçlarına bakmak için Son çalıştırma zaman damgasının yanındaki
simgesine tıklayın.
İşler hakkında daha fazla bilgi için bkz. İşler nedir?.
Ek tümleştirmeler
Azure Databricks ile veri mühendisliğine yönelik tümleştirmeler ve araçlar hakkında daha fazla bilgi edinin: