Git klasörlerinden veya çalışma alanı dosyalarından Python modüllerini içeri aktarma
Python kodunu Databricks Git klasörlerinde veya çalışma alanı dosyalarında depolayabilir ve ardından bu Python kodunu DLT işlem hatlarınıza aktarabilirsiniz. Git klasörleri veya çalışma alanı dosyalarındaki modüllerle çalışma hakkında daha fazla bilgi için bkz. Python ve R modülleriyle çalışma.
Not
Databricks Git klasöründe veya çalışma alanı dosyasında depolanan bir not defterinden kaynak kodu içeri aktaramazsınız. Bunun yerine, işlem hattı oluştururken veya düzenlerken not defterini doğrudan ekleyin. Bakınız DLT işlem hattını yapılandırma.
Python modülünü DLT işlem hattına aktarma
Aşağıdaki örnek, çalışma alanı dosyalarından veri kümesi sorgularını Python modülleri olarak içeri aktarmayı gösterir. Bu örnekte işlem hattı kaynak kodunu depolamak için çalışma alanı dosyalarının kullanılması açıklanmış olsa da, bunu git klasöründe depolanan kaynak koduyla kullanabilirsiniz.
Bu örneği çalıştırmak için aşağıdaki adımları kullanın:
Azure Databricks çalışma alanınızın kenar çubuğunda
Çalışma Alanı'ye tıklayarak çalışma alanı tarayıcısını açın.
Python modülleri için bir dizin seçmek için çalışma alanı tarayıcısını kullanın.
Seçili dizinin en sağdaki sütununda
tıklayın ve > Dosyası Oluşturöğesine tıklayın.
Dosya için bir ad girin; örneğin,
clickstream_raw_module.py
. Dosya düzenleyicisi açılır. Kaynak verileri tabloya okumak üzere bir modül oluşturmak için düzenleyici penceresine aşağıdakileri girin:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Hazırlanan verileri içeren yeni bir tablo oluşturan bir modül oluşturmak için, aynı dizinde yeni bir dosya oluşturun, dosya için bir ad (örneğin,
clickstream_prepared_module.py
) girin ve yeni düzenleyici penceresine aşağıdakileri girin:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Ardından bir işlem hattı defteri oluşturun. Azure Databricks giriş sayfanıza gidin ve Not Defteri Oluşturseçeneğini belirleyin veya kenar çubuğunda
Yeni SimgeYeni ve ardından Not Defteriseçeneklerini belirleyin. Not defterini çalışma alanı tarayıcısında
ve > Not Defteri Oluşturtıklayarak da oluşturabilirsiniz.
Not defterinizi adlandırın ve varsayılan dil Python olduğunu onaylayın.
Oluştur'utıklayın.
Not defterine örnek kodu girin.
Not
Not defteriniz çalışma alanı dosyaları yolundan veya not defteri dizininden farklı bir Git klasörleri yolundan modülleri veya paketleri içeri aktarıyorsa,
sys.path.append()
kullanarak yolu dosyalara el ile eklemeniz gerekir.Git klasöründen bir dosya içeri aktarıyorsanız,
/Workspace/
'ı yolun önüne eklemelisiniz. Örneğin,sys.path.append('/Workspace/...')
. Path'ten/Workspace/
'ı çıkarmak bir hata ile sonuçlanır.Modüller veya paketler not defteriyle aynı dizinde depolanıyorsa yolu el ile eklemeniz gerekmez. Ayrıca git klasörünün kök dizininden içeri aktarırken yolu el ile eklemeniz gerekmez çünkü kök dizin yola otomatik olarak eklenir.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("catalog_name.schema_name.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
<module-path>
öğesini, içe aktarılacak Python modüllerini içeren dizinin yoluyla değiştirin.Yeni not defterini kullanarak bir işlem hattı oluşturun.
İşlem hattını çalıştırmak için, İşlem Hattı Ayrıntıları sayfasında Başlattıklayın.
Python kodunu paket olarak da içeri aktarabilirsiniz. Aşağıdaki DLT not defterinden kod parçacığı, test_utils
paketini not defteriyle aynı dizin içindeki dlt_packages
dizininden içeri aktarır.
dlt_packages
dizini test_utils.py
ve __init__.py
dosyalarını içerir ve test_utils.py
işlevi create_test_table()
tanımlar:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)