Bagikan melalui


Membuat alur Unity Catalog dengan cara mengkloning alur metastore Apache Hive

Artikel ini menjelaskan permintaan clone a pipeline di Databricks REST API dan bagaimana Anda dapat menggunakannya untuk menyalin alur yang ada yang diterbitkan ke metastore Apache Hive ke alur baru yang diterbitkan ke Katalog Unity. Saat Anda memanggil permintaan clone a pipeline, permintaan tersebut:

  • Menyalin kode sumber dan konfigurasi dari pipeline yang ada ke pipeline baru, menerapkan penyesuaian konfigurasi yang Anda tentukan.
  • Memperbarui tampilan Materialisasi dan Definisi dan referensi tabel Streaming dengan perubahan yang diperlukan agar objek tersebut dikelola oleh Katalog Unity.
  • Memulai pembaruan pipelin untuk memigrasikan data dan metadata yang ada, seperti titik pemeriksaan, untuk tabel streaming apa pun dalam pipelin. Ini memungkinkan tabel Streaming tersebut untuk melanjutkan pemrosesan pada titik yang sama dengan alur asli.

Setelah operasi kloning selesai, alur asli dan baru dapat berjalan secara independen.

Artikel ini mencakup contoh panggilan permintaan API secara langsung dan melalui skrip Python dari buku catatan Databricks.

Sebelum Anda mulai

Berikut ini yang diperlukan sebelum mengkloning pipeline:

  • Untuk mengkloning pipeline metastore Hive, tabel dan tampilan yang ditentukan dalam pipeline harus dipublikasikan ke dalam skema target. Untuk mempelajari cara menambahkan skema target ke alur pemrosesan, lihat Cara mengonfigurasi alur untuk menerbitkan ke metastore Apache Hive.

  • Referensi ke tabel atau tampilan terkelola Hive metastore dalam alur pipa yang akan dikloning harus diberi kualifikasi sepenuhnya dengan katalog (hive_metastore), skema, dan nama tabel. Misalnya, dalam kode berikut yang membuat himpunan data customers, argumen nama tabel harus diperbarui ke hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Jangan mengedit kode sumber untuk pipeline metastore Hive sumber saat operasi kloning sedang berlangsung, termasuk notebook yang dikonfigurasi sebagai bagian dari pipeline dan modul apa pun yang disimpan dalam folder Git atau file pada workspace.

  • Alur sumber metastore Apache Hive harus tidak berjalan saat Anda memulai operasi kloning. Jika pembaruan berjalan, hentikan atau tunggu hingga selesai.

Berikut ini adalah pertimbangan penting lainnya sebelum mengkloning alur:

  • Jika tabel dalam alur metastore Apache Hive menentukan lokasi penyimpanan menggunakan argumen path di Python atau LOCATION di SQL, teruskan konfigurasi "pipelines.migration.ignoreExplicitPath": "true" ke permintaan kloning. Pengaturan konfigurasi ini disertakan dalam instruksi di bawah ini.
  • Jika alur metastore Apache Hive menyertakan sumber Auto Loader yang menentukan nilai untuk opsi cloudFiles.schemaLocation, dan alur metastore Apache Hive akan tetap beroperasi setelah membuat kloning Unity Catalog, Anda harus mengatur opsi mergeSchema ke true pada alur metastore Apache Hive dan alur Unity Catalog yang dikloning. Menambahkan opsi ini ke pipeline metastore Apache Hive sebelum mengkloning akan menyalin opsi tersebut ke pipeline baru.

Mengkloning alur dengan Databricks REST API

Contoh berikut menggunakan perintah curl untuk memanggil permintaan clone a pipeline di Databricks REST API:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Mengganti:

  • <personal-access-token> dengan akses token pribadi databricks .
  • <databricks-instance> dengan nama instans ruang kerja Azure Databricks, misalnya adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> dengan pengidentifikasi unik dari pipeline metastore Hive yang akan diklon. Anda dapat menemukan ID alur di UI DLT.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Mengganti:

  • <target-catalog-name> memiliki nama katalog di Unity Catalog tempat alur baru harus diterbitkan. Ini harus merupakan katalog yang ada.
  • <target-schema-name> dengan nama skema di Unity Catalog ke mana alur baru harus diterbitkan jika berbeda dari nama skema saat ini. Parameter ini bersifat opsional, dan jika tidak ditentukan, nama skema yang ada digunakan.
  • <new-pipeline-name> dengan nama opsional untuk pipa baru. Jika tidak ditentukan, alur baru diberi nama menggunakan nama alur sumber dengan [UC] ditambahkan.

clone_mode menentukan mode yang akan digunakan untuk operasi kloning. MIGRATE_TO_UC adalah satu-satunya opsi yang didukung.

Gunakan bidang configuration untuk menentukan konfigurasi pada alur baru. Nilai yang diatur di sini menggantikan konfigurasi di alur asli.

Respon dari permintaan REST API clone adalah ID pipeline dari pipeline baru Unity Catalog.

Mengkloning alur dari buku catatan Databricks

Contoh berikut memanggil permintaan create a pipeline dari skrip Python. Anda bisa menggunakan buku catatan Databricks untuk menjalankan skrip ini:

  1. Buat buku catatan baru untuk skrip. Lihat Membuat buku catatan.
  2. Salin skrip Python berikut ke sel pertama buku catatan.
  3. Perbarui nilai placeholder dalam skrip dengan mengganti:
    • <databricks-instance> dengan nama instans ruang kerja Azure Databricks, misalnya adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> dengan pengidentifikasi unik dari alur metastore Hive untuk dikloning. Anda dapat menemukan ID alur di UI DLT.
    • <target-catalog-name> dengan nama katalog di Unity Catalog tempat alur baru harus diterbitkan. Ini harus berupa katalog yang ada.
    • <target-schema-name> dengan nama skema di Unity Catalog tempat alur baru tersebut harus diterbitkan jika berbeda dari nama skema saat ini. Parameter ini bersifat opsional, dan jika tidak ditentukan, nama skema yang ada digunakan.
    • <new-pipeline-name> dengan nama opsional untuk jalur baru. Jika tidak ditentukan, alur baru diberi nama menggunakan nama alur sumber dengan [UC] ditambahkan.
  4. Jalankan skrip. Lihat Menjalankan notebook Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Keterbatasan

Berikut ini adalah batasan permintaan API clone a pipeline DLT:

  • Hanya pengkloningan dari pipeline yang dikonfigurasi untuk menggunakan database metastore Hive ke pipeline Katalog Unity didukung.
  • Anda hanya dapat membuat kloning di ruang kerja Azure Databricks yang sama dengan alur tempat Anda mengkloning.
  • Alur yang Anda kloning hanya dapat menyertakan sumber streaming berikut:
    • Sumber delta
    • Auto Loader, termasuk sumber data apa pun yang didukung oleh Auto Loader. Lihat Memuat file dari penyimpanan objek cloud.
    • Apached Kafka dengan Streaming Terstruktur. Namun, sumber Kafka tidak dapat dikonfigurasi untuk menggunakan opsi kafka.group.id. Lihat pemrosesan Stream dengan Apache Kafka dan Azure Databricks.
    • Amazon Kinesis dengan Streaming Terstruktur. Namun, sumber Kinesis tidak dapat dikonfigurasi untuk mengatur consumerMode ke efo.
  • Jika alur metastore Apache Hive yang Anda kloning menggunakan mode pemberitahuan file Auto Loader, Databricks merekomendasikan untuk tidak menjalankan alur metastore Apache Hive setelah kloning. Ini karena menjalankan alur kerja metastore Hive menghasilkan penghapusan beberapa peristiwa notifikasi file dari klon Unity Catalog. Jika alur metastore sumber Hive berjalan setelah operasi kloning selesai, Anda dapat memulihkan file yang hilang menggunakan Auto Loader dengan opsi cloudFiles.backfillInterval. Untuk mempelajari tentang mode pemberitahuan file Auto Loader, lihat Apa itu mode pemberitahuan file Auto Loader?. Untuk mempelajari tentang backfilling file dengan Auto Loader, lihat Memicu backfills reguler menggunakan opsi cloudFiles.backfillInterval dan Common Auto Loader.
  • Tugas pemeliharaan alur secara otomatis dijeda untuk kedua alur saat kloning sedang berlangsung.
  • Berikut ini berlaku untuk kueri perjalanan waktu terhadap tabel dalam alur Unity Catalog yang dikloning:
    • Jika versi tabel awalnya ditulis ke objek terkelola metastore Apache Hive, kueri lintas waktu menggunakan klausa timestamp_expression tidak terdefinisi ketika mengkueri objek Katalog Unity yang dikloning.
    • Namun, jika versi tabel ditulis ke objek kloning Unity Catalog, kueri perjalanan waktu dengan menggunakan klausa timestamp_expression berfungsi dengan benar.
    • Kueri perjalanan waktu menggunakan klausa version bekerja dengan benar saat mengkueri objek kloning Katalog Unity, bahkan ketika versinya awalnya ditulis ke objek terkelola metastore Apache Hive.
  • Untuk batasan lain saat menggunakan DLT dengan Unity Catalog, lihat batasan alur Unity Catalog.
  • Untuk batasan Katalog Unity, lihat .