Importera Python-moduler från Git-mappar eller arbetsytefiler
Du kan lagra Python-kod i Databricks Git-mappar eller i arbetsytefiler och sedan importera Python-koden till dina DLT-pipelines. Mer information om hur du arbetar med moduler i Git-mappar eller arbetsytefiler finns i Arbeta med Python- och R-moduler.
Note
Du kan inte importera källkod från en notebook-fil som lagras i en Databricks Git-mapp eller en arbetsytefil. Lägg i stället till anteckningsboken direkt när du skapar eller redigerar en pipeline. Se Konfigurera en DLT-pipeline.
Importera en Python-modul till en DLT-pipeline
I följande exempel visas hur du importerar datamängdsfrågor som Python-moduler från arbetsytefiler. Även om det här exemplet beskriver hur du använder arbetsytefiler för att lagra pipelinens källkod, kan du använda den med källkod som lagras i en Git-mapp.
Använd följande steg för att köra det här exemplet:
Klicka på ikonen
Arbetsyta i sidofältet på din Azure Databricks-arbetsyta för att öppna arbetsyte-webbläsaren.
Använd arbetsytans webbläsare för att välja en katalog för Python-modulerna.
Klicka på
i den högra kolumnen i den valda katalogen och klicka på Skapa > Fil.
Ange ett namn på filen, till exempel
clickstream_raw_module.py
. Filredigeraren öppnas. Om du vill skapa en modul för att läsa källdata i en tabell anger du följande i redigeringsfönstret:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Om du vill skapa en modul som skapar en ny tabell som innehåller förberedda data skapar du en ny fil i samma katalog, anger ett namn på filen, till exempel
clickstream_prepared_module.py
och anger följande i det nya redigeringsfönstret:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Skapa sedan en pipeline notebook-fil. Gå till din Azure Databricks-landningssida och välj Skapa en notebook, eller klicka på
Ny i sidofältet och välj Notebook. Du kan också skapa anteckningsboken i arbetsytans webbläsare genom att klicka på
och sedan på Skapa > anteckningsbok.
Ge anteckningsboken ett namn och bekräfta Python- är standardspråket.
Klicka på Skapa.
Ange exempelkoden i notebook-filen.
Observera
Om ditt notebook importerar moduler eller paket från en sökväg i arbetsytans filer eller en sökväg i Git-mappar som skiljer sig från notebook-katalogen, måste du manuellt lägga till sökvägen till filerna med hjälp av
sys.path.append()
.Om du importerar en fil från en Git-mapp måste du lägga till
/Workspace/
framför sökvägen. Till exempelsys.path.append('/Workspace/...')
. Om du utelämnar/Workspace/
från sökvägen resulterar det i ett fel.Om modulerna eller paketen lagras i samma katalog som notebook-filen behöver du inte lägga till sökvägen manuellt. Du behöver inte heller lägga till sökvägen manuellt när du importerar från rotkatalogen i en Git-mapp eftersom rotkatalogen läggs till automatiskt i sökvägen.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("catalog_name.schema_name.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Ersätt
<module-path>
med sökvägen till katalogen som innehåller De Python-moduler som ska importeras.Skapa en pipeline med hjälp av den nya notebook-filen.
För att köra pipelinen, på sidan Pipeline-detaljer, klickar du på Starta.
Du kan också importera Python-kod som ett paket. Följande kodfragment från en DLT-notebook-fil importerar test_utils
-paketet från katalogen dlt_packages
i samma katalog som notebook-filen. Katalogen dlt_packages
innehåller filerna test_utils.py
och __init__.py
, och test_utils.py
definierar funktionen create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)