Bir Hive meta veri deposu işlem hattını kopyalayarak bir Unity Kataloğu işlem hattı oluşturun.
Bu makalede Databricks REST API'sindeki clone a pipeline
isteği ve Bunu kullanarak Hive meta veri deposunda yayımlanan mevcut bir işlem hattını Unity Kataloğu'nda yayımlayan yeni bir işlem hattına nasıl kopyalayabileceğiniz açıklanır.
clone a pipeline
isteğini çağırdığınızda, şu işlemleri yapar:
- Kaynak kodu ve yapılandırmayı mevcut işlem hattından yenisine kopyalar ve belirttiğiniz yapılandırma geçersiz kılmalarını uygular.
- Unity Kataloğu tarafından yönetilecek nesneler için Malzeme görünümü ve Akış tablosu tanımları ve başvurularını gerekli değişikliklerle günceller.
- Boru hattındaki her türlü akış tablosu için denetim noktaları gibi mevcut veri ve meta verileri geçirmek amacıyla bir güncelleme sürecini başlatır. Bu, bu Akış tablolarının işlemeye özgün işlem hattıyla aynı noktada devam etmesini sağlar.
Kopyalama işlemi tamamlandıktan sonra hem özgün hem de yeni işlem hatları bağımsız olarak çalıştırılabilir.
Bu makale, API isteğini doğrudan ve Databricks not defterinden python betiği aracılığıyla çağırma örnekleri içerir.
Başlamadan önce
İşlem hattını kopyalamadan önce aşağıdakiler gereklidir:
Hive meta veri deposu işlem hattını kopyalamak için, işlem hattında tanımlanan tabloların ve görünümlerin tabloları hedef şemaya yayımlaması gerekir. İşlem hattına hedef şema eklemeyi öğrenmek için bkz. Hive meta veri deposuna yayımlamak için işlem hattı yapılandırma.
Kopyalanacak hive meta veri deposu yönetilen tablolarına veya işlem hattındaki görünümlere yapılan başvurular katalog (
hive_metastore
), şema ve tablo adıyla tam olarak nitelenmelidir. Örneğin, aşağıdaki koddacustomers
veri kümesi oluştururken tablo adı bağımsız değişkenihive_metastore.sales.customers
olarak güncelleştirilmelidir:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
İşlem hattının parçası olarak yapılandırılan not defterleri ve Git klasörleri veya çalışma alanı dosyalarında depolanan modüller de dahil olmak üzere kopyalama işlemi devam ederken kaynak Hive meta veri deposu işlem hattının kaynak kodunu düzenlemeyin.
Kopyalama işlemini başlattığınızda kaynak Hive meta veri deposu işlem hattı çalışmıyor olmalıdır. Bir güncelleştirme çalışıyorsa, bunu durdurun veya tamamlanmasını bekleyin.
İşlem hattını kopyalamadan önce dikkat edilmesi gereken diğer önemli noktalar şunlardır:
- Hive meta veri deposu işlem hattındaki tablolar Python'da
path
bağımsız değişkenini veya SQL'deLOCATION
kullanarak bir depolama konumu belirtiyorsa,"pipelines.migration.ignoreExplicitPath": "true"
yapılandırmasını kopyalama isteğine iletin. Bu yapılandırmanın ayarlanması aşağıdaki yönergelere eklenmiştir. - Hive meta veri deposu işlem hattı,
cloudFiles.schemaLocation
seçeneği için bir değer belirten bir Otomatik Yükleyici kaynağı içeriyorsa ve Unity Kataloğu kopyası oluşturulduktan sonra Hive meta veri deposu işlem hattı çalışır durumda kalacaksa, hem Hive meta veri deposu işlem hattında hem de kopyalanan Unity Kataloğu işlem hattındatrue
içinmergeSchema
seçeneğini ayarlamanız gerekir. Klonlamadan önce bu seçeneği Hive meta veri deposu işlem hattına eklemek, seçeneği yeni işlem hattına kopyalar.
Databricks REST API ile işlem hattını kopyalama
Aşağıdaki örnek, Databricks REST API'sindeki clone a pipeline
isteğini çağırmak için curl
komutunu kullanır:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Değiştirmek:
-
<personal-access-token>
, Databricks kişisel erişim anahtarıile. -
<databricks-instance>
ile Azure Databricks çalışma alanı örneği adı, örneğinadb-1234567890123456.7.azuredatabricks.net
- Klonlanacak Hive meta veri deposu işlem hattının benzersiz tanımlayıcısıyla
<pipeline-id>
. İşlem hattı kimliğini DLT kullanıcı arabirimindebulabilirsiniz.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Değiştirmek:
-
<target-catalog-name>
, yeni işlem hattının yayımlaması gereken bir katalog adı olarak Unity Kataloğu'nda yer almalıdır. Bu, mevcut bir katalog olmalıdır. -
<target-schema-name>
, geçerli şema adından farklıysa yeni işlem hattının yayımlaması gereken Unity Kataloğu'ndaki bir şemanın adıyla birlikte. Bu parametre isteğe bağlıdır ve belirtilmezse mevcut şema adı kullanılır. -
<new-pipeline-name>
yeni işlem hattı için isteğe bağlı bir adla. Belirtilmezse, yeni işlem hattı,[UC]
eklenmiş olarak kaynak işlem hattı adı kullanılarak adlandırılır.
clone_mode
kopyalama işlemi için kullanılacak modu belirtir. desteklenen tek seçenek MIGRATE_TO_UC
.
Yeni işlem hattında yapılandırmaları belirtmek için configuration
alanını kullanın. Burada ayarlanan değerler, özgün işlem hattındaki yapılandırmaları geçersiz kılar.
clone
REST API isteğinden gelen yanıt, yeni Unity Catalog işlem hattının kimliğidir.
Databricks defterinden bir iş akışını kopyalama
Aşağıdaki örnekte python betiğinden create a pipeline
isteği çağrılır. Bu betiği çalıştırmak için Databricks not defteri kullanabilirsiniz:
- Betik için yeni bir not defteri oluşturun. Bkz. Bir not defteri oluşturun.
- Aşağıdaki Python betiğini not defterinin ilk hücresine kopyalayın.
- Betikteki yer tutucu değerlerini, değiştirerek güncelleyin:
-
<databricks-instance>
ile Azure Databricks çalışma alanı örneği adı , örneğinadb-1234567890123456.7.azuredatabricks.net
- Klonlanacak Hive metastore işlem hattının benzersiz tanımlayıcısı
<pipeline-id>
. İşlem hattı kimliğini DLT kullanıcı arayüzündebulabilirsiniz. - Unity Kataloğu'nda yeni işlem hattının yayımlaması gereken bir katalog adına sahip
<target-catalog-name>
. Bu, mevcut bir katalog olmalıdır. -
<target-schema-name>
, geçerli şema adından farklıysa yeni işlem hattının yayımlaması gereken Unity Kataloğu'ndaki bir şemanın adıyla birlikte. Bu parametre isteğe bağlıdır ve belirtilmezse mevcut şema adı kullanılır. - yeni işlem hattı için isteğe bağlı bir adla
<new-pipeline-name>
. Belirtilmezse, yeni işlem hattı,[UC]
eklenmiş olarak kaynak işlem hattı adı kullanılarak adlandırılır.
-
- Betik çalıştır. Bknz. Databricks not defterlerini çalıştırma.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Sınırlama
DLT clone a pipeline
API isteğinin sınırlamaları şunlardır:
- Yalnızca Hive meta veri depounu bir Unity Kataloğu işlem hattına kullanmak üzere yapılandırılmış bir işlem hattından kopyalama desteklenir.
- Kopyayı yalnızca kopyaladığınız işlem hattıyla aynı Azure Databricks çalışma alanında oluşturabilirsiniz.
- Kopyaladığınız işlem hattı yalnızca aşağıdaki akış kaynaklarını içerebilir:
- Delta kaynakları
- Otomatik Yükleyici tarafından desteklenen tüm veri kaynakları dahil olmak üzere Otomatik Yükleyici. Bkz. Bulut nesne depolama alanından dosya yükleme.
- Yapılandırılmış Akış ile Apached Kafka. Ancak Kafka kaynağı
kafka.group.id
seçeneğini kullanacak şekilde yapılandırılamaz. Bkz. Apache Kafka ve Azure Databricks ileStream işleme. - Yapılandırılmış Akış ile Amazon Kinesis. Ancak Kinesis kaynağı,
consumerMode
efo
olarak ayarlanacak şekilde yapılandırılamaz.
- Kopyaladığınız Hive meta veri deposu işlem hattı Otomatik Yükleyici dosya bildirim modunu kullanıyorsa Databricks, kopyalama işleminden sonra Hive meta veri deposu işlem hattını çalıştırmamanızı önerir. Bunun nedeni Hive meta veri deposu işlem hattının çalıştırılmasının Unity Kataloğu kopyasından bazı dosya bildirim olaylarının düşmesine neden olmasıdır. Kopyalama işlemi tamamlandıktan sonra kaynak Hive meta veri deposu işlem hattı çalıştırılırsa,
cloudFiles.backfillInterval
seçeneğiyle Otomatik Yükleyici'yi kullanarak eksik dosyaları yedekleyebilirsiniz. Otomatik Yükleyici dosya bildirim modu hakkında bilgi edinmek için bkz. Otomatik Yükleyici dosyası bildirim modu nedir?. Otomatik Yükleyici ile dosyaları geri doldurma hakkında bilgi edinmek için cloudFiles.backfillInterval kullanarak düzenli geri doldurmaları tetikleme ve Ortak Otomatik Yükleyici seçeneklerikonularına bakın. - Kopyalama işlemi devam ederken, boru hattı bakım görevleri her iki boru hattı için de otomatik olarak duraklatılır.
- Aşağıdakiler, kopyalanan Unity Kataloğu işlem hattındaki tablolara yönelik zaman yolculuğu sorguları için geçerlidir:
- Tablo sürümü başlangıçta Hive meta veri deposu tarafından yönetilen bir nesneye yazıldıysa, kopyalanan Unity Kataloğu nesnesi sorgulandığında
timestamp_expression
yan tümcesini kullanan zaman yolculuğu sorguları belirsizdir. - Ancak, tablo sürümü kopyalanan Unity Catalog nesnesine yazılmışsa,
timestamp_expression
ifadesi kullanan zaman yolculuğu sorguları düzgün çalışır. -
version
yan tümcesi kullanan zaman yolculuğu sorguları, klonlanmış Unity Kataloğu nesnesi sorgulandığında, sürüm başlangıçta Hive meta veri deposu yönetilen nesnesine yazıldığında bile düzgün çalışır.
- Tablo sürümü başlangıçta Hive meta veri deposu tarafından yönetilen bir nesneye yazıldıysa, kopyalanan Unity Kataloğu nesnesi sorgulandığında
- Unity Kataloğu ile DLT kullanırken diğer sınırlamalar için bkz. Unity Kataloğu işlem hattı sınırlamaları.
- Unity Kataloğu sınırlamaları için bkz. Unity Kataloğu sınırlamaları.