將數據內嵌至 Azure Databricks Lakehouse
Azure Databricks 提供各種方法,可將資料導入至由 Delta Lake 支援的資料庫。 本文列出支援的匯入工具及指引,並說明如何根據數據源、延遲等準則選擇使用的方法。
攝取方法
您可以使用下列方法將資料內嵌至 Databricks:
- 批次擷取 一組不常處理的數據列
- 串流接收 個別數據列或一組數據列,以便在到達時進行實時處理
擷取的資料會載入到 Delta 資料表中,,然後可用於各種下游資料和 AI 應用案例。 由於 Databricks 的 Lakehouse 架構,您不需要針對不同使用案例重複儲存數據,並且可以利用 Unity Catalog 進行集中式存取控制、稽核、譜系和跨所有數據的探索。
批次匯入
使用批次擷取,您通常會根據排程(例如每天)或手動觸發,將成批次的行數據載入 Databricks 中。 這代表傳統擷取、轉換、載入 (ETL) 使用案例中的「擷取」部分。 您可以使用批次擷取從下列位置載入資料:
- 類似 CSV 的本機檔案
- 雲端物件記憶體,包括 Amazon S3、Azure Data Lake Storage 和 Google Cloud Storage
- Salesforce 等 SaaS 應用程式,以及 SQL Server 等資料庫
批次擷取支援各種不同的檔格式,包括 CSV、TSV、JSON、XML、Avro、ORC、Parquet 和文本檔。
Databricks 同時支援傳統批次擷取和累加批次擷取選項。 雖然傳統批次擷取會在每次執行時處理所有記錄,但累加批次擷取會自動偵測數據源中的新記錄,並忽略已經擷取的記錄。 這表示需要處理較少的數據,因此擷取作業的執行速度會更快,且更有效率地使用計算資源。
傳統 (一次性) 批次擷取
您可以使用新增數據 UI,上傳本機數據檔,或從公用 URL 下載檔案。 請參閱 上傳檔案。
累加批次導入
本節說明支援的累加批次擷取工具。
串流數據表
CREATE STREAMING TABLE
SQL 命令可讓您從雲端物件記憶體以累加方式將數據載入串流數據表。 請參閱 CREATE STREAMING TABLE。
範例:使用串流數據表的累加批次擷取
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
Cloud 物件記憶體連接器
內建雲端物件儲存連接器自動載入器可讓您在抵達 Amazon S3 (S3)、Azure Data Lake Storage Gen 2 (ALDS2) 或 Google Cloud Storage (GCS) 時,以累加且有效率的方式處理新的數據檔。 請參閱 自動載入器。
範例:使用自動加載器進行增量批次擷取
df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data")
.schema("/databricks-datasets/retail-org/customers/schema")
.load("/databricks-datasets/retail-org/customers/")
完全受控連接器
Lakeflow Connect 提供完全受管控的連接器,用於從像 Salesforce 這樣的 SaaS 應用程式和像 SQL Server 這樣的資料庫中提取數據。 受控連接器可使用下列方式:
- Databricks 使用者介面
- Databricks CLI
- Databricks API 介面
- Databricks SDK
- Databricks 資產組合(DAB)
請參閱 Lakeflow Connect。
串流擷取
透過串流擷取,您會在資料列產生時持續載入資料列或批次資料列,讓您能在資料列幾乎即時到達時進行查詢。 您可以使用串流擷取,從 Apache Kafka、Amazon Kinesis、Google Pub/Sub 和 Apache Pulsar 等來源載入串流數據。
Databricks 也支援使用內建連接器進行串流擷取。 這些連接器可讓您在從串流來源送達時,以累加且有效率的方式處理新數據。 請參閱 設定串流資料來源。
範例:從 Kafka 串流引入
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
在 DLT 中進行 批次和串流擷取
Databricks 建議使用 DLT 來建置可靠且可調整的數據處理管線。 DLT 同時支援批次和串流擷取,而且您可以從自動載入器所支援的任何數據源擷取數據。
範例:使用 DLT 的累加批次擷取
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
範例:使用 DLT 從 Kafka 串流擷取
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
導入排程
您可以將數據作為一次性操作、周期性排程或持續性地進行引入。
- 針對近乎即時的串流使用案例,請使用連續模式。
- 增量匯入使用案例的話,可以選擇單次匯入或者設定重複的排程。
資料導入合作夥伴
許多第三方工具支援批次或串流擷取至 Databricks。 Databricks 會驗證各種第三方整合,不過設定來源系統的存取權和內嵌數據的步驟會因工具而異。 如需已驗證的工具清單,請參閱 擷取合作夥伴。 Databricks Partner Connect也展示了一些技術合作夥伴,該平台提供一個 UI,可以簡化第三方工具與 lakehouse 資料的連接。
DIY 匯入
Databricks 提供一般計算平臺。 因此,您可以使用 Databricks 所支援的任何程式設計語言,例如 Python 或 Java 來建立自己的擷取連接器。 您也可以匯入並運用熱門的開放原始碼連接器連結庫,例如數據載入工具、Airbyte 和 Debezium。
攝取替代方案
Databricks 建議針對大部分使用案例進行擷取,因為它會調整以容納高數據量、低延遲查詢和第三方 API 限制。 資料匯入會將資料從來源系統複製到 Azure Databricks,這可能會導致重複資料隨著時間變得過時。 如果您不想複製資料,您可以使用下列工具:
- Lakehouse 聯盟 可讓您查詢外部資料來源,而不需要移動您的資料。
- Delta Sharing 可讓您安全地跨平臺、雲端和區域共享數據。
不過,如果您不想複製數據,請使用 Lakehouse 同盟或 Delta 共用。
何時使用 Delta Sharing
針對下列情況選擇 Delta Sharing:
- 限制數據重複
- 查詢最新的可能數據
使用 Lakehouse 聯盟的時機
針對下列案例選擇 Lakehouse 同盟:
- ETL 管線上的臨時報告或概念驗證工作
選擇引入方法時的考量
考慮 | 指導 |
---|---|
數據源 | 如果數據源有 Lakeflow Connect 原生連接器,這會是內嵌數據的最簡單方式。 若為 Lakeflow Connect 不支援的數據源,請從來源擷取數據,然後使用自動載入器將數據內嵌至 Databricks。 針對本機檔案,請使用 Databricks UI 來上傳數據。 |
延遲 | 如果您想要近乎即時地分析數據,請使用串流來利用累加處理。 透過串流處理,數據可在每個記錄送達時立即進行查詢。 否則,請使用批次匯入。 |
數據移動 | 如果您無法將數據從來源系統複製到 Databricks,請使用 Lakehouse 同盟或 Delta 共用。 |
將數據遷移至 Delta Lake
若要瞭解如何將現有的數據遷移至 Delta Lake,請參閱 將數據遷移至 Delta Lake。
COPY INTO(舊版)
CREATE STREAMING TABLE
SQL 命令是傳統 COPY INTO
SQL 命令的建議替代方案,以便從雲端物件記憶體進行累加擷取。 請參閱 COPY INTO。 如需更可調整且健全的檔案擷取體驗,Databricks 建議 SQL 使用者利用串流數據表,而不是 COPY INTO
。