Aracılığıyla paylaş


Bir Hive meta veri deposu işlem hattını kopyalayarak bir Unity Kataloğu işlem hattı oluşturun.

Bu makalede Databricks REST API'sindeki clone a pipeline isteği ve Bunu kullanarak Hive meta veri deposunda yayımlanan mevcut bir işlem hattını Unity Kataloğu'nda yayımlayan yeni bir işlem hattına nasıl kopyalayabileceğiniz açıklanır. clone a pipeline isteğini çağırdığınızda, şu işlemleri yapar:

  • Kaynak kodu ve yapılandırmayı mevcut işlem hattından yenisine kopyalar ve belirttiğiniz yapılandırma geçersiz kılmalarını uygular.
  • Unity Kataloğu tarafından yönetilecek nesneler için Malzeme görünümü ve Akış tablosu tanımları ve başvurularını gerekli değişikliklerle günceller.
  • Boru hattındaki her türlü akış tablosu için denetim noktaları gibi mevcut veri ve meta verileri geçirmek amacıyla bir güncelleme sürecini başlatır. Bu, bu Akış tablolarının işlemeye özgün işlem hattıyla aynı noktada devam etmesini sağlar.

Kopyalama işlemi tamamlandıktan sonra hem özgün hem de yeni işlem hatları bağımsız olarak çalıştırılabilir.

Bu makale, API isteğini doğrudan ve Databricks not defterinden python betiği aracılığıyla çağırma örnekleri içerir.

Başlamadan önce

İşlem hattını kopyalamadan önce aşağıdakiler gereklidir:

  • Hive meta veri deposu işlem hattını kopyalamak için, işlem hattında tanımlanan tabloların ve görünümlerin tabloları hedef şemaya yayımlaması gerekir. İşlem hattına hedef şema eklemeyi öğrenmek için bkz. Hive meta veri deposuna yayımlamak için işlem hattı yapılandırma.

  • Kopyalanacak hive meta veri deposu yönetilen tablolarına veya işlem hattındaki görünümlere yapılan başvurular katalog (hive_metastore), şema ve tablo adıyla tam olarak nitelenmelidir. Örneğin, aşağıdaki kodda customers veri kümesi oluştururken tablo adı bağımsız değişkeni hive_metastore.sales.customersolarak güncelleştirilmelidir:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • İşlem hattının parçası olarak yapılandırılan not defterleri ve Git klasörleri veya çalışma alanı dosyalarında depolanan modüller de dahil olmak üzere kopyalama işlemi devam ederken kaynak Hive meta veri deposu işlem hattının kaynak kodunu düzenlemeyin.

  • Kopyalama işlemini başlattığınızda kaynak Hive meta veri deposu işlem hattı çalışmıyor olmalıdır. Bir güncelleştirme çalışıyorsa, bunu durdurun veya tamamlanmasını bekleyin.

İşlem hattını kopyalamadan önce dikkat edilmesi gereken diğer önemli noktalar şunlardır:

  • Hive meta veri deposu işlem hattındaki tablolar Python'da path bağımsız değişkenini veya SQL'de LOCATION kullanarak bir depolama konumu belirtiyorsa, "pipelines.migration.ignoreExplicitPath": "true" yapılandırmasını kopyalama isteğine iletin. Bu yapılandırmanın ayarlanması aşağıdaki yönergelere eklenmiştir.
  • Hive meta veri deposu işlem hattı, cloudFiles.schemaLocation seçeneği için bir değer belirten bir Otomatik Yükleyici kaynağı içeriyorsa ve Unity Kataloğu kopyası oluşturulduktan sonra Hive meta veri deposu işlem hattı çalışır durumda kalacaksa, hem Hive meta veri deposu işlem hattında hem de kopyalanan Unity Kataloğu işlem hattında true için mergeSchema seçeneğini ayarlamanız gerekir. Klonlamadan önce bu seçeneği Hive meta veri deposu işlem hattına eklemek, seçeneği yeni işlem hattına kopyalar.

Databricks REST API ile işlem hattını kopyalama

Aşağıdaki örnek, Databricks REST API'sindeki clone a pipeline isteğini çağırmak için curl komutunu kullanır:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Değiştirmek:

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Değiştirmek:

  • <target-catalog-name>, yeni işlem hattının yayımlaması gereken bir katalog adı olarak Unity Kataloğu'nda yer almalıdır. Bu, mevcut bir katalog olmalıdır.
  • <target-schema-name>, geçerli şema adından farklıysa yeni işlem hattının yayımlaması gereken Unity Kataloğu'ndaki bir şemanın adıyla birlikte. Bu parametre isteğe bağlıdır ve belirtilmezse mevcut şema adı kullanılır.
  • <new-pipeline-name> yeni işlem hattı için isteğe bağlı bir adla. Belirtilmezse, yeni işlem hattı, [UC] eklenmiş olarak kaynak işlem hattı adı kullanılarak adlandırılır.

clone_mode kopyalama işlemi için kullanılacak modu belirtir. desteklenen tek seçenek MIGRATE_TO_UC.

Yeni işlem hattında yapılandırmaları belirtmek için configuration alanını kullanın. Burada ayarlanan değerler, özgün işlem hattındaki yapılandırmaları geçersiz kılar.

clone REST API isteğinden gelen yanıt, yeni Unity Catalog işlem hattının kimliğidir.

Databricks defterinden bir iş akışını kopyalama

Aşağıdaki örnekte python betiğinden create a pipeline isteği çağrılır. Bu betiği çalıştırmak için Databricks not defteri kullanabilirsiniz:

  1. Betik için yeni bir not defteri oluşturun. Bkz. Bir not defteri oluşturun.
  2. Aşağıdaki Python betiğini not defterinin ilk hücresine kopyalayın.
  3. Betikteki yer tutucu değerlerini, değiştirerek güncelleyin:
    • <databricks-instance> ile Azure Databricks çalışma alanı örneği adı , örneğin adb-1234567890123456.7.azuredatabricks.net
    • Klonlanacak Hive metastore işlem hattının benzersiz tanımlayıcısı <pipeline-id>. İşlem hattı kimliğini DLT kullanıcı arayüzündebulabilirsiniz.
    • Unity Kataloğu'nda yeni işlem hattının yayımlaması gereken bir katalog adına sahip <target-catalog-name>. Bu, mevcut bir katalog olmalıdır.
    • <target-schema-name>, geçerli şema adından farklıysa yeni işlem hattının yayımlaması gereken Unity Kataloğu'ndaki bir şemanın adıyla birlikte. Bu parametre isteğe bağlıdır ve belirtilmezse mevcut şema adı kullanılır.
    • yeni işlem hattı için isteğe bağlı bir adla <new-pipeline-name>. Belirtilmezse, yeni işlem hattı, [UC] eklenmiş olarak kaynak işlem hattı adı kullanılarak adlandırılır.
  4. Betik çalıştır. Bknz. Databricks not defterlerini çalıştırma.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Sınırlama

DLT clone a pipeline API isteğinin sınırlamaları şunlardır:

  • Yalnızca Hive meta veri depounu bir Unity Kataloğu işlem hattına kullanmak üzere yapılandırılmış bir işlem hattından kopyalama desteklenir.
  • Kopyayı yalnızca kopyaladığınız işlem hattıyla aynı Azure Databricks çalışma alanında oluşturabilirsiniz.
  • Kopyaladığınız işlem hattı yalnızca aşağıdaki akış kaynaklarını içerebilir:
    • Delta kaynakları
    • Otomatik Yükleyici tarafından desteklenen tüm veri kaynakları dahil olmak üzere Otomatik Yükleyici. Bkz. Bulut nesne depolama alanından dosya yükleme.
    • Yapılandırılmış Akış ile Apached Kafka. Ancak Kafka kaynağı kafka.group.id seçeneğini kullanacak şekilde yapılandırılamaz. Bkz. Apache Kafka ve Azure Databricks ileStream işleme.
    • Yapılandırılmış Akış ile Amazon Kinesis. Ancak Kinesis kaynağı, consumerModeefoolarak ayarlanacak şekilde yapılandırılamaz.
  • Kopyaladığınız Hive meta veri deposu işlem hattı Otomatik Yükleyici dosya bildirim modunu kullanıyorsa Databricks, kopyalama işleminden sonra Hive meta veri deposu işlem hattını çalıştırmamanızı önerir. Bunun nedeni Hive meta veri deposu işlem hattının çalıştırılmasının Unity Kataloğu kopyasından bazı dosya bildirim olaylarının düşmesine neden olmasıdır. Kopyalama işlemi tamamlandıktan sonra kaynak Hive meta veri deposu işlem hattı çalıştırılırsa, cloudFiles.backfillInterval seçeneğiyle Otomatik Yükleyici'yi kullanarak eksik dosyaları yedekleyebilirsiniz. Otomatik Yükleyici dosya bildirim modu hakkında bilgi edinmek için bkz. Otomatik Yükleyici dosyası bildirim modu nedir?. Otomatik Yükleyici ile dosyaları geri doldurma hakkında bilgi edinmek için cloudFiles.backfillInterval kullanarak düzenli geri doldurmaları tetikleme ve Ortak Otomatik Yükleyici seçeneklerikonularına bakın.
  • Kopyalama işlemi devam ederken, boru hattı bakım görevleri her iki boru hattı için de otomatik olarak duraklatılır.
  • Aşağıdakiler, kopyalanan Unity Kataloğu işlem hattındaki tablolara yönelik zaman yolculuğu sorguları için geçerlidir:
    • Tablo sürümü başlangıçta Hive meta veri deposu tarafından yönetilen bir nesneye yazıldıysa, kopyalanan Unity Kataloğu nesnesi sorgulandığında timestamp_expression yan tümcesini kullanan zaman yolculuğu sorguları belirsizdir.
    • Ancak, tablo sürümü kopyalanan Unity Catalog nesnesine yazılmışsa, timestamp_expression ifadesi kullanan zaman yolculuğu sorguları düzgün çalışır.
    • version yan tümcesi kullanan zaman yolculuğu sorguları, klonlanmış Unity Kataloğu nesnesi sorgulandığında, sürüm başlangıçta Hive meta veri deposu yönetilen nesnesine yazıldığında bile düzgün çalışır.
  • Unity Kataloğu ile DLT kullanırken diğer sınırlamalar için bkz. Unity Kataloğu işlem hattı sınırlamaları.
  • Unity Kataloğu sınırlamaları için bkz. Unity Kataloğu sınırlamaları.