자습서: 첫 번째 DLT 파이프라인 실행
이 자습서에서는 첫 번째 DLT 파이프라인을 구성하고, 기본 ETL 코드를 작성하고, 파이프라인 업데이트를 실행하는 단계를 안내합니다.
이 자습서의 모든 단계는 Unity 카탈로그를 사용하도록 설정된 작업 영역을 위해 설계되었습니다. 레거시 Hive 메타스토어와 함께 작동하도록 DLT 파이프라인을 구성할 수도 있습니다. 레거시 Hive 메타스토어와 함께 DLT 파이프라인 사용참조하세요.
메모
이 자습서에는 Databricks Notebook을 사용하여 새 파이프라인 코드를 개발하고 유효성을 검사하기 위한 지침이 있습니다. Python 또는 SQL 파일의 소스 코드를 사용하여 파이프라인을 구성할 수도 있습니다.
DLT 구문을 사용하여 작성된 소스 코드가 이미 있는 경우 코드를 실행하도록 파이프라인을 구성할 수 있습니다. DLT 파이프라인구성을 참조하세요.
Databricks SQL에서 완전히 선언적인 SQL 구문을 사용하여 구체화된 뷰 및 스트리밍 테이블에 대한 새로 고침 일정을 Unity 카탈로그 관리 개체로 등록하고 설정할 수 있습니다. Databricks SQL 구체화된 뷰 사용 및 Databricks SQL 스트리밍 테이블을 사용하여 데이터 로드참조하세요.
예: 뉴욕 아기 이름 데이터 수집 및 처리
이 문서의 예제에서는 뉴욕 주 아기 이름레코드를 포함하는 공개적으로 사용 가능한 데이터 세트를 사용합니다. 이 예제에서는 DLT 파이프라인을 사용하여 다음을 수행합니다.
- 볼륨에서 테이블로 원시 CSV 데이터를 읽습니다.
- 수집 테이블에서 레코드를 읽고 DLT 기대치를 사용하여 정리된 데이터를 포함하는 새 테이블을 만듭니다.
- 정리된 레코드를 파생 데이터 세트를 만드는 DLT 쿼리에 대한 입력으로 사용합니다.
이 코드는 medallion 아키텍처의 간소화된 예제를 보여 줍니다. 메달리언 레이크하우스 아키텍처란 무엇입니까?.
이 예제의 구현은 Python 및 SQL에 대해 제공됩니다. 단계에 따라 새 파이프라인 및 Notebook을 만든 다음 제공된 코드를 복사하여 붙여넣습니다.
요구 사항
- 파이프라인을 시작하려면 클러스터 만들기 권한 또는 DLT 클러스터를 정의하는 클러스터 정책에 대한 액세스 권한이 있어야 합니다. DLT 런타임은 파이프라인을 실행하기 전에 클러스터를 만들고 올바른 권한이 없으면 실패합니다.
- 모든 사용자는 기본적으로 서버리스 파이프라인을 사용하여 업데이트를 트리거할 수 있습니다. 서버리스는 계정 수준에서 사용하도록 설정해야 하며 작업 영역 지역에서 사용할 수 없을 수도 있습니다. 서버리스 컴퓨팅 사용을 참조하세요.
이 자습서의 예제에서는 Unity 카탈로그사용합니다. Databricks는 대상 스키마에 여러 데이터베이스 개체가 생성되기 때문에 이 자습서를 실행하기 위해 새 스키마를 만드는 것이 좋습니다.
- 카탈로그에서 새 스키마를 만들려면
ALL PRIVILEGES
또는USE CATALOG
및CREATE SCHEMA
권한이 있어야 합니다. - 새 스키마를 만들 수 없는 경우 기존 스키마에 대해 이 자습서를 실행합니다. 다음과 같은 권한이 있어야 합니다.
- 부모 카탈로그에
USE CATALOG
. - 대상 스키마에 대한
ALL PRIVILEGES
또는USE SCHEMA
,CREATE MATERIALIZED VIEW
,CREATE TABLE
권한입니다.
- 부모 카탈로그에
- 이 자습서에서는 볼륨을 사용하여 샘플 데이터를 저장합니다. Databricks는 이 자습서에 대한 새 볼륨을 만드는 것이 좋습니다. 이 자습서에 대한 새 스키마를 만드는 경우 해당 스키마에 새 볼륨을 만들 수 있습니다.
- 기존 스키마에서 새 볼륨을 만들려면 다음 권한이 있어야 합니다.
- 부모 카탈로그에 대한
USE CATALOG
. - 대상 스키마에 대한 권한을
ALL PRIVILEGES
또는USE SCHEMA
및CREATE VOLUME
.
- 부모 카탈로그에 대한
- 필요에 따라 기존 볼륨을 사용할 수 있습니다. 다음과 같은 권한이 있어야 합니다.
- 부모 카탈로그에 대한
USE CATALOG
. -
USE SCHEMA
은 부모 스키마에 대한 것입니다. - 대상 볼륨에서
ALL PRIVILEGES
또는READ VOLUME
과WRITE VOLUME
가 위치한다.
- 부모 카탈로그에 대한
- 기존 스키마에서 새 볼륨을 만들려면 다음 권한이 있어야 합니다.
이러한 권한을 설정하려면 Databricks 관리자에게 문의하세요. Unity 카탈로그 권한에 대한 자세한 내용은 Unity 카탈로그 권한 및 보안 개체참조하세요.
- 카탈로그에서 새 스키마를 만들려면
0단계: 데이터 다운로드
이 예제에서는 Unity 카탈로그 볼륨에서 데이터를 로드합니다. 다음 코드는 CSV 파일을 다운로드하여 지정된 볼륨에 저장합니다. 새 Notebook을 열고 다음 코드를 실행하여 이 데이터를 지정된 볼륨에 다운로드합니다.
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
<catalog-name>
, <schema-name>
및 <volume-name>
를 Unity 카탈로그 볼륨의 카탈로그 이름, 스키마 이름 및 볼륨 이름으로 바꾸세요. 제공된 코드는 이러한 개체가 없는 경우 지정된 스키마 및 볼륨을 만들려고 시도합니다. Unity 카탈로그에서 개체를 만들고 쓸 수 있는 적절한 권한이 있어야 합니다.
요구 사항참조하세요.
메모
자습서를 계속하기 전에 이 Notebook이 성공적으로 실행되었는지 확인합니다. 이 Notebook을 파이프라인의 일부로 구성하지 마세요.
1단계: 파이프라인 만들기
DLT는 DLT 구문을 사용하여 Notebook 또는 파일(소스 코드)에 정의된 종속성을 확인하여 파이프라인을 만듭니다. 각 소스 코드 파일에는 하나의 언어만 포함될 수 있지만 파이프라인에 여러 언어별 Notebook 또는 파일을 추가할 수 있습니다.
중요하다
소스 코드 필드에 자산을 구성하지 마세요. 이 필드를 검은색으로 두면 소스 코드 작성을 위한 Notebook이 만들어지고 구성됩니다.
이 자습서의 지침에서는 서버리스 컴퓨팅 및 Unity 카탈로그를 사용합니다. 이러한 지침에 지정되지 않은 모든 구성 옵션에 대한 기본 설정을 사용합니다.
메모
작업 영역에서 서버리스를 사용하도록 설정하거나 지원하지 않는 경우 기본 컴퓨팅 설정을 사용하여 작성된 자습서를 완료할 수 있습니다. 만들기 파이프라인 UI의 대상 섹션에서 Storage 옵션 아래에서 Unity 카탈로그 수동으로 선택해야 합니다.
새 파이프라인을 구성하려면 다음을 수행합니다.
- 사이드바에서 DLT클릭합니다.
- 파이프라인만들기를 클릭합니다.
- 파이프라인 이름에서 고유한 파이프라인 이름을 입력합니다.
- 서버리스 확인란을 선택합니다.
- 대상에서 테이블이 게시되는 Unity Catalog 위치를 구성하려면 카탈로그 및 스키마를 선택합니다.
-
고급구성 추가를 클릭한 다음 다음 매개 변수 이름을 사용하여 데이터를 다운로드한 카탈로그, 스키마 및 볼륨에 대한 파이프라인 매개 변수를 정의합니다.
my_catalog
my_schema
my_volume
- 클릭합니다만들기.
새 파이프라인에 대한 파이프라인 UI가 나타납니다. 소스 코드 노트북은 파이프라인에 대해 자동으로 생성되고 구성됩니다.
노트는 사용자 디렉터리 안에 새로운 디렉터리가 생성되어 만들어집니다. 새 디렉터리 및 파일의 이름이 파이프라인의 이름과 일치합니다. 예를 들어 /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
이 Notebook에 액세스하기 위한 링크는 파이프라인 세부 정보 패널의 소스 코드 필드 아래에 있습니다. 다음 단계로 진행하기 전에 링크를 클릭하여 전자 필기장을 엽니다.
2단계: Python 또는 SQL을 사용하여 Notebook에서 구체화된 뷰 및 스트리밍 테이블 선언
Datbricks Notebook을 사용하여 DLT 파이프라인에 대한 소스 코드를 대화형으로 개발하고 유효성을 검사할 수 있습니다. 이 기능을 사용하려면 Notebook을 파이프라인에 연결해야 합니다. 방금 만든 파이프라인에 새로 만든 Notebook을 연결하려면 다음을 수행합니다.
- 오른쪽 위에서 연결 클릭하여 컴퓨팅 구성 메뉴를 엽니다.
- 1단계에서 만든 파이프라인의 이름을 마우스로 가리킵니다.
- 연결클릭합니다.
사용자 인터페이스(UI)가 오른쪽 위에 유효성 검사 및 시작 버튼을 포함하도록 변경됩니다. 노트북에서 파이프라인 코드 개발을 지원하기 위한 자세한 내용은 노트북에서 DLT 파이프라인 개발 및 디버깅하기를 참조하세요.
중요하다
- DLT 파이프라인은 준비 단계에서 노트북의 모든 셀을 평가합니다. 다목적 컴퓨트에 대해 실행되거나 작업으로 예약된 노트북과 달리, 파이프라인은 셀이 지정된 순서대로 실행된다는 보장이 없습니다.
- Notebook은 단일 프로그래밍 언어만 포함할 수 있습니다. 파이프라인 소스 코드 Notebook에서 Python 및 SQL 코드를 혼합하지 마세요.
Python 또는 SQL을 사용하여 코드를 개발하는 방법에 대한 자세한 내용은 Python 사용하여 파이프라인 코드 개발 또는 SQL 사용하여 파이프라인 코드 개발참조하세요.
예제 파이프라인 코드
이 자습서의 예제를 구현하려면 다음 코드를 복사하여 파이프라인의 소스 코드로 구성된 Notebook의 셀에 붙여넣습니다.
제공된 코드는 다음을 수행합니다.
- 필요한 모듈을 가져옵니다(Python에만 해당).
- 파이프라인 구성 중에 정의된 매개 변수를 참조합니다.
- 볼륨으로부터 데이터를 수집하는
baby_names_raw
이라는 스트리밍 테이블을 정의합니다. - 수집된 데이터의 유효성을 검사하는
baby_names_prepared
명명된 구체화된 뷰를 정의합니다. - 데이터에 대한 고도로 정제된 뷰를 가진 구체화된 뷰
top_baby_names_2021
를 정의합니다.
파이썬
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
3단계: 파이프라인 업데이트 시작
파이프라인 업데이트를 시작하려면 Notebook UI의 오른쪽 위에 있는 시작 단추를 클릭합니다.
예제 노트북
다음 Notebook에는 이 문서에 제공된 것과 동일한 코드 예제가 포함되어 있습니다. 이러한 Notebook은 이 문서의 단계와 동일한 요구 사항을 갖습니다. 요구 사항참조하세요.
Notebook을 가져오려면 다음 단계를 완료합니다.
- Notebook UI를 엽니다.
- + 새>노트북을 클릭합니다.
- 빈 노트가 열립니다.
- 파일>가져오기...클릭합니다. 가져오기 대화 상자가 나타납니다.
- 에서가져오기용 URL 옵션을 선택합니다.
- Notebook의 URL을 붙여넣습니다.
- 가져오기클릭합니다.
이 자습서에서는 DLT 파이프라인을 구성하고 실행하기 전에 데이터 설정 Notebook을 실행해야 합니다. 다음 노트북을 가져오고, 노트북을 컴퓨팅 리소스에 연결한 후, my_catalog
, my_schema
, my_volume
에 필요한 변수를 입력하고 모두 실행클릭하세요.
파이프라인용 데이터 다운로드 자습서
노트북 가져오기
다음 Notebook은 Python 또는 SQL의 예제를 제공합니다. 전자 필기장을 가져오면 사용자 홈 디렉터리에 저장됩니다.
아래 Notebook 중 하나를 가져온 후 파이프라인을 만드는 단계를 완료하지만 소스 코드 파일 선택기를 사용하여 다운로드한 Notebook을 선택합니다. 소스 코드로 구성된 노트북을 사용하여 파이프라인을 만든 후, 파이프라인 UI에서 시작을 클릭하면 업데이트가 시작됩니다.