Aracılığıyla paylaş


Öğretici: İlk DLT işlem hattınızı çalıştırma

Bu öğreticide ilk DLT işlem hattınızı yapılandırma, temel ETL kodu yazma ve işlem hattı güncelleştirmesini çalıştırma adımları izlenir.

Bu öğreticideki tüm adımlar Unity Kataloğu'nu etkinleştirmiş çalışma alanları için tasarlanmıştır. Ayrıca DLT işlem hatlarını eski Hive meta veri deposuyla çalışacak şekilde yapılandırabilirsiniz. Eski Hive meta veri deposu ile DLT işlem hatlarını kullanma için bkz. .

Not

Bu öğreticide Databricks not defterlerini kullanarak yeni işlem hattı kodu geliştirme ve doğrulama yönergeleri yer alır. Python veya SQL dosyalarındaki kaynak kodu kullanarak işlem hatlarını da yapılandırabilirsiniz.

DLT söz dizimi kullanılarak yazılmış kaynak kodunuz varsa kodunuzu çalıştırmak için bir işlem hattı yapılandırabilirsiniz. Bkz. Bir DLT işlem hattını yapılandırın.

Unity Kataloğu tarafından yönetilen nesneler olarak materyalize edilmiş görünümler ve akış tabloları için yenileme zamanlamalarını kaydetmek ve ayarlamak amacıyla Databricks SQL'de tam bildirim temelli SQL söz dizimini kullanabilirsiniz. Bkz. Databricks SQL'de gerçekleştirilen görünümleri kullanma ve Databricks SQL'de akış tablolarını kullanarak veri yükleme.

Örnek: New York bebek adları verilerini alma ve işleme

Bu makaledeki örnekte, New York Eyalet bebek adlarının kayıtlarını içeren genel kullanıma açık bir veri kümesi kullanılır. Bu örnekte, DLT işlem hattının aşağıdakiler için kullanılması gösterilmektedir:

  • Bir birimden ham CSV verilerini bir tabloya oku.
  • Alma tablosundaki kayıtları okuyun ve temizlenmiş veriler içeren yeni bir tablo oluşturmak için DLT beklentileri kullanın.
  • Temizlenen kayıtları türetilmiş veri kümeleri oluşturan DLT sorgularına giriş olarak kullanın.

Bu kod, madalyon mimarisinin basitleştirilmiş bir örneğini gösterir. Bkz. Madalyon göl evi mimarisi nedir?.

Bu örneğin uygulamaları Python ve SQL için sağlanır. Yeni bir işlem hattı ve not defteri oluşturmak için adımları izleyin ve ardından sağlanan kodu kopyalayıp yapıştırın.

Tam koda sahip örnek not defterleri de sağlanır.

Gereksinimler

  • İşlem hattını başlatmak için küme oluşturma iznine veya DLT kümesi tanımlayan bir küme ilkesine erişiminiz olmalıdır. DLT çalışma zamanı, işlem hattınızı çalıştırmadan önce bir küme oluşturur ve doğru izne sahip değilseniz başarısız olur.
  • Tüm kullanıcılar varsayılan olarak sunucusuz işlem hatlarını kullanarak güncelleştirmeleri tetikleyebilir. Sunucusuz hesap düzeyinde etkinleştirilmelidir ve çalışma alanı bölgenizde kullanılamayabilir. Bkz. Sunucusuz işlemeyi etkinleştir.
  • Bu öğreticide yer alan örneklerde Unity Kataloğukullanılır. Databricks, hedef şemada birden çok veritabanı nesnesi oluşturulduğundan bu öğreticiyi çalıştırmak için yeni bir şema oluşturulmasını önerir.

    • Katalogda yeni şema oluşturmak için ALL PRIVILEGES veya USE CATALOG ve CREATE SCHEMA ayrıcalıklarınız olmalıdır.
    • Yeni şema oluşturamıyorsanız, bu öğreticiyi var olan bir şemada çalıştırın. Aşağıdaki ayrıcalıklara sahip olmanız gerekir:
      • Ana katalog için USE CATALOG.
      • hedef şemada ALL PRIVILEGES veya USE SCHEMA, CREATE MATERIALIZED VIEWve CREATE TABLE ayrıcalıkları.
    • Bu öğreticide örnek verileri depolamak için bir birim kullanılır. Databricks, bu öğretici için yeni bir birim oluşturulmasını önerir. Bu öğretici için yeni bir şema oluşturursanız, bu şemada yeni bir birim oluşturabilirsiniz.
      • Mevcut şemada yeni bir birim oluşturmak için aşağıdaki ayrıcalıklara sahip olmanız gerekir:
        • Ana katalog için USE CATALOG.
        • hedef şemada ALL PRIVILEGES veya USE SCHEMA ve CREATE VOLUME ayrıcalıkları.
      • İsteğe bağlı olarak mevcut bir birimi kullanabilirsiniz. Aşağıdaki ayrıcalıklara sahip olmanız gerekir:
        • Üst katalog için USE CATALOG.
        • Ana şema için USE SCHEMA.
        • Hedef birimde ALL PRIVILEGES veya READ VOLUME ve WRITE VOLUME.

    Bu izinleri ayarlamak için Databricks yöneticinize başvurun. Unity Kataloğu ayrıcalıkları hakkında daha fazla bilgi için bkz. Unity Kataloğu ayrıcalıkları ve güvenliği sağlanabilir nesneler.

Adım 0: Verileri indirme

Bu örnek bir Unity Kataloğu biriminden veri yükler. Aşağıdaki kod bir CSV dosyasını indirir ve belirtilen birimde depolar. Yeni bir not defteri açın ve bu verileri belirtilen birime indirmek için aşağıdaki kodu çalıştırın:

import urllib

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

urllib.request.urlretrieve(download_url, volume_path + filename)

<catalog-name>, <schema-name>ve <volume-name> yerine Unity Kataloğu biriminin katalog, şema ve birim adlarını yazın. Sağlanan kod, bu nesneler yoksa belirtilen şemayı ve birimi oluşturmaya çalışır. Unity Kataloğu'nda nesneler oluşturmak ve nesnelere yazmak için uygun ayrıcalıklara sahip olmanız gerekir. bkz. Gereksinimleri.

Not

Eğitime devam etmeden önce bu not defterinin başarıyla çalıştırıldığından emin olun. Bu not defterini işlem hattınızın bir parçası olarak yapılandırmayın.

1. Adım: İşlem hattı oluşturma

DLT, DLT söz dizimlerini kullanarak not defterlerinde veya dosyalarda tanımlanan bağımlılıkları (kaynak kodu olarak adlandırılır) çözümleyerek işlem hatları oluşturur. Her kaynak kodu dosyası yalnızca bir dil içerebilir, ancak işlem hattına dile özgü birden çok not defteri veya dosya ekleyebilirsiniz.

Önemli

Kaynak kodu alanındaki hiçbir varlığı yapılandırmayın. Bu alanı siyah bırakmak, kaynak kodu yazma için bir not defteri oluşturur ve yapılandırılır.

Bu öğreticideki yönergeler sunucusuz işlem ve Unity Kataloğu'nu kullanır. Bu yönergelerde belirtilmeyen tüm yapılandırma seçenekleri için varsayılan ayarları kullanın.

Not

Çalışma alanınızda sunucusuz etkinleştirilmediyse veya desteklenmiyorsa, varsayılan işlem ayarlarını kullanarak öğreticiyi yazıldı olarak tamamlayabilirsiniz. İşlem hattı oluşturma kullanıcı arabiriminin Hedef bölümünde Depolama seçenekleri altında Unity Kataloğu el ile seçmeniz gerekir.

Yeni bir işlem hattı yapılandırmak için aşağıdakileri yapın:

  1. Kenar çubuğunda DLTöğesine tıklayın.
  2. İşlem hattı oluşturtıklayın.
  3. İşlem Hattı adı alanına benzersiz bir işlem hattı adı yazın.
  4. Sunucusuz onay kutusunu seçin.
  5. Hedef'nde, tabloların yayımlandığı Unity Kataloğu konumunu yapılandırmak için bir Kataloğu ve Şemasıseçin.
  6. Gelişmiş'nde Yapılandırma ekle'ye tıklayın ve ardından aşağıdaki parametre adlarını kullanarak verileri indirdiğiniz katalog, şema ve birim için işlem hattı parametrelerini tanımlayın:
    • my_catalog
    • my_schema
    • my_volume
  7. Oluştur'u tıklayın.

Yeni işlem hattı için işlem hatları kullanıcı arabirimi görüntülenir. İşlem hattı için otomatik olarak bir kaynak kodu not defteri oluşturulur ve yapılandırılır.

Not defteri, kullanıcı dizininizdeki yeni bir dizinde oluşturulur. Yeni dizin ve dosyanın adı, işlem hattınızın adıyla eşleşsin. Örneğin, /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Bu not defterine erişim bağlantısı, İşlem hattı ayrıntıları panelindeki Kaynak kodu alanının altındadır. Sonraki adıma geçmeden önce not defterini açmak için bağlantıya tıklayın.

2. Adım: Python veya SQL ile bir not defteri içinde malzemeleşmiş görünümler ve akış tabloları bildirin.

DLT işlem hatları için kaynak kodunu etkileşimli olarak geliştirmek ve doğrulamak için Datbricks not defterlerini kullanabilirsiniz. Bu işlevi kullanmak için not defterinizi işlem hattına eklemeniz gerekir. Yeni oluşturduğunuz not defterini yeni oluşturduğunuz işlem hattına eklemek için:

  1. Sağ üstteki Connect'e tıklayarak işlem yapılandırma menüsünü açın.
  2. 1. Adımda oluşturduğunuz işlem hattının adının üzerine gelin.
  3. tıklayın.Bağlan'ı seçin.

Kullanıcı arabirimi, sağ üstteki Doğrula ve Başlat düğmelerini içerecek şekilde değişir. İşlem hattı kodu geliştirme için not defteri desteği hakkında daha fazla bilgi edinmek için not defterlerinde DLT işlem hatlarını geliştirme ve hatalarını ayıklama bölümüne bakın.

Önemli

  • DLT işlem hatları, planlama sırasında not defterindeki tüm hücreleri değerlendirir. Tüm amaçlı işlemlerde çalıştırılan veya iş olarak zamanlanan not defterlerinden farklı olarak, işlem hatları hücrelerin belirtilen sırada çalışmasını garanti etmemektedir.
  • Not defterleri yalnızca tek bir programlama dili içerebilir. İşlem hattı kaynak kodu not defterlerinde Python ve SQL kodunu karıştırmayın.

Python veya SQL ile kod geliştirme hakkında ayrıntılı bilgi için Python ile işlem hattı kodu geliştirme veya SQL ile işlem hattı kodu geliştirmebölümlerine bakın.

Örnek işlem hattı kodu

Bu öğreticideki örneği uygulamak için aşağıdaki kodu kopyalayıp işlem hattınızın kaynak kodu olarak yapılandırılmış not defterindeki bir hücreye yapıştırın.

Sağlanan kod aşağıdakileri yapar:

  • Gerekli modülleri içeri aktarır (yalnızca Python).
  • İşlem hattı yapılandırması sırasında tanımlanan parametrelere başvurur.
  • Bir birimden veri alan baby_names_raw adında bir akış tablosu tanımlar.
  • Alınan verileri doğrulayan baby_names_prepared adlı gerçekleştirilmiş bir görünüm tanımlar.
  • Verilerin son derece iyileştirilmiş bir görünümü olan top_baby_names_2021 adlı maddileştirilmiş bir görünümü tanımlar.

Piton

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Adım 3: İşlem hattı güncelleştirmesini başlatma

İşlem hattı güncelleştirmesini başlatmak için not defteri kullanıcı arabiriminin sağ üst kısmındaki Başlangıç düğmesine tıklayın.

Örnek not defterleri

Aşağıdaki not defterleri, bu makalede sağlanan kod örneklerini içerir. Bu not defterleri, bu makaledeki adımlarla aynı gereksinimlere sahiptir. bkz. Gereksinimleri.

Not defterini içeri aktarmak için aşağıdaki adımları tamamlayın:

  1. Not defteri kullanıcı arabirimini açın.
    • + Yeni>Not Defteriöğesine tıklayın.
    • Boş bir not defteri açılır.
  2. Dosya>İçe Aktar...öğesine tıkla. İçeri Aktar iletişim kutusu görüntülenir.
  3. 'tanİçeri Aktararak URL seçeneğini seçin.
  4. Not defterinin URL'sini yapıştırın.
  5. İçeri Aktar'ıtıklayın.

Bu eğitimde, DLT işlem hattınızı yapılandırmadan ve çalıştırmadan önce bir veri kurulum not defterini çalıştırmanız gerekmektedir. Aşağıdaki not defterini içeri aktarın, not defterini bir işlem kaynağına ekleyin, my_catalog, my_schemave my_volumeiçin gerekli değişkeni doldurun ve Tümünü çalıştır'atıklayın.

İşlem hatları için veri indirme kılavuzu

not defteri alma

Aşağıdaki not defterleri Python veya SQL'de örnekler sağlar. Bir not defterini içeri aktardığınızda, not defteri kullanıcı giriş dizininize kaydedilir.

Aşağıdaki not defterlerinden birini içeri aktardıktan sonra işlem hattı oluşturma adımlarını tamamlayın, ancak indirilen not defterini seçmek için Kaynak kodu dosya seçicisini kullanın. Kaynak kod olarak yapılandırılmış bir not defteriyle işlem hattını oluşturduktan sonra, güncellemeyi başlatmak için işlem hattı kullanıcı arabiriminde Başlat'a tıklayın.

DLT Python not defterini kullanmaya başlama

not defteri alma

DLT SQL not defterini kullanmaya başlama

not defteri alma

Ek kaynaklar