البرنامج التعليمي: Delta Lake
يقدم هذا البرنامج التعليمي عمليات Delta Lake الشائعة على Azure Databricks، بما في ذلك ما يلي:
- إنشاء جدول.
- Upsert إلى جدول.
- القراءة من جدول.
- عرض محفوظات الجدول.
- الاستعلام عن إصدار سابق من جدول.
- تحسين جدول.
- إضافة فهرس ترتيب Z.
- تفريغ الملفات غير المؤجلة.
يمكنك تشغيل مثال التعليمات البرمجية Python وSc scala وSQL في هذه المقالة من داخل دفتر ملاحظات مرفق بمورد حساب Azure Databricks مثل نظام مجموعة. يمكنك أيضا تشغيل التعليمات البرمجية SQL في هذه المقالة من داخل استعلام مقترن بمستودع SQL في Databricks SQL.
إعداد بيانات المصدر
يعتمد هذا البرنامج التعليمي على مجموعة بيانات تسمى People 10 M. يحتوي على 10 ملايين سجل وهمي يحتوي على حقائق عن الناس، مثل الأسماء الأولى والأخيرة وتاريخ الميلاد والراتب. يفترض هذا البرنامج التعليمي أن مجموعة البيانات هذه موجودة في وحدة تخزين كتالوج Unity المقترنة بمساحة عمل Azure Databricks المستهدفة.
للحصول على مجموعة بيانات People 10 M لهذا البرنامج التعليمي، قم بما يلي:
- انتقل إلى صفحة الأشخاص 10 M في Kaggle.
- انقر فوق تنزيل لتنزيل ملف باسم
archive.zip
إلى جهازك المحلي. - استخراج الملف المسمى
export.csv
منarchive.zip
الملف.export.csv
يحتوي الملف على بيانات هذا البرنامج التعليمي.
لتحميل export.csv
الملف إلى وحدة التخزين، قم بما يلي:
- على الشريط الجانبي، انقر فوق كتالوج.
- في مستكشف الكتالوج، استعرض وصولا إلى وحدة التخزين وافتحها حيث تريد تحميل
export.csv
الملف. - انقر فوق تحميل إلى وحدة التخزين هذه.
- اسحب الملف الموجود على جهازك المحلي وأفلته أو استعرض وصولا إليه وحدده
export.csv
. - انقر فوق تحميل.
في أمثلة التعليمات البرمجية التالية، استبدل /Volumes/main/default/my-volume/export.csv
بالمسار إلى export.csv
الملف في وحدة التخزين الهدف.
إنشاء جدول
تستخدم جميع الجداول التي تم إنشاؤها على Azure Databricks Delta Lake بشكل افتراضي. توصي Databricks باستخدام الجداول المدارة في كتالوج Unity.
في مثال التعليمات البرمجية السابق وأمثلة التعليمات البرمجية التالية، استبدل اسم main.default.people_10m
الجدول بالكتالوج والمخطط واسم الجدول المكون من ثلاثة أجزاء الهدف في كتالوج Unity.
إشعار
Delta Lake هو الافتراضي لجميع أوامر القراءة والكتابة وإنشاء الجدول Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
تنشئ العمليات السابقة جدولا مدارا جديدا. للحصول على معلومات حول الخيارات المتوفرة عند إنشاء جدول دلتا، راجع إنشاء جدول.
في Databricks Runtime 13.3 LTS والإصدارات الأحدث، يمكنك استخدام CREATE TABLE LIKE لإنشاء جدول Delta فارغ جديد يكرر خصائص المخطط والجدول لجدول Delta المصدر. يمكن أن يكون هذا مفيدا بشكل خاص عند ترقية الجداول من بيئة تطوير إلى إنتاج، كما هو موضح في مثال التعليمات البرمجية التالي:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
لإنشاء جدول فارغ، يمكنك أيضا استخدام DeltaTableBuilder
واجهة برمجة التطبيقات في Delta Lake ل Python وSc scala. مقارنة بواجهات برمجة تطبيقات DataFrameWriter المكافئة، تسهل واجهات برمجة التطبيقات هذه تحديد معلومات إضافية مثل تعليقات الأعمدة وخصائص الجدول والأعمدة التي تم إنشاؤها.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert إلى جدول
لدمج مجموعة من التحديثات والإدخالات في جدول Delta موجود، يمكنك استخدام DeltaTable.merge
الأسلوب ل Python وSc scala وعلامتي MERGE INTO ل SQL. على سبيل المثال، يأخذ المثال التالي البيانات من الجدول المصدر ويدمجها في جدول Delta الهدف. عند وجود صف مطابق في كلا الجدولين، يقوم Delta Lake بتحديث عمود البيانات باستخدام التعبير المحدد. عندما لا يوجد صف مطابق، يضيف Delta Lake صفا جديدا. تعرف هذه العملية باسم upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
في SQL، إذا قمت بتحديد *
، يقوم هذا بتحديث أو إدراج كافة الأعمدة في الجدول الهدف، على افتراض أن الجدول المصدر يحتوي على نفس الأعمدة مثل الجدول الهدف. إذا لم يكن الجدول الهدف يحتوي على نفس الأعمدة، يطرح الاستعلام خطأ تحليل.
يجب تحديد قيمة لكل عمود في الجدول عند تنفيذ عملية إدراج (على سبيل المثال، عند عدم وجود صف مطابق في مجموعة البيانات الموجودة). ومع ذلك، لا تحتاج إلى تحديث كافة القيم.
لمشاهدة النتائج، استعلم عن الجدول.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
قراءة جدول
يمكنك الوصول إلى البيانات في جداول Delta حسب اسم الجدول أو مسار الجدول، كما هو موضح في الأمثلة التالية:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
الكتابة إلى جدول
يستخدم Delta Lake بناء الجملة القياسي لكتابة البيانات إلى الجداول.
لإضافة بيانات جديدة تلقائيا إلى جدول Delta موجود، استخدم وضع الإلحاق كما هو موضح في الأمثلة التالية:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
لاستبدال كافة البيانات في جدول، استخدم وضع الكتابة فوق كما في الأمثلة التالية:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
تحديث جدول
يمكنك تحديث البيانات التي تطابق دالة تقييم في جدول Delta. على سبيل المثال، في جدول المثال people_10m
، لتغيير اختصار في gender
العمود من M
أو F
إلى Male
أو Female
، يمكنك تشغيل ما يلي:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
حذف من جدول
يمكنك إزالة البيانات التي تطابق دالة تقييم من جدول Delta. على سبيل المثال، في جدول المثال people_10m
، لحذف كافة الصفوف المقابلة للأشخاص الذين يعانون من قيمة في birthDate
العمود من قبل 1955
، يمكنك تشغيل ما يلي:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
هام
يزيل الحذف البيانات من أحدث إصدار من جدول Delta ولكنه لا يزيلها من التخزين الفعلي حتى يتم تفريغ الإصدارات القديمة بشكل صريح. راجع الفراغ للحصول على التفاصيل.
عرض محفوظات الجدول
لعرض محفوظات جدول، يمكنك استخدام DeltaTable.history
الأسلوب ل Python وSc scala، وبيان وصف المحفوظات في SQL، الذي يوفر معلومات المصدر، بما في ذلك إصدار الجدول والعملية والمستخدم وما إلى ذلك، لكل كتابة إلى جدول.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
الاستعلام عن إصدار سابق من الجدول (السفر عبر الزمن)
يسمح لك السفر عبر الوقت في Delta Lake بالاستعلام عن لقطة قديمة لجدول Delta.
للاستعلام عن إصدار أقدم من جدول، حدد إصدار الجدول أو الطابع الزمني. على سبيل المثال، للاستعلام عن الإصدار 0 أو الطابع 2024-05-15T22:43:15.000+00:00Z
الزمني من المحفوظات السابقة، استخدم ما يلي:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
بالنسبة للطوابع الزمنية، يتم قبول سلاسل التاريخ أو الطابع الزمني فقط، على سبيل المثال، "2024-05-15T22:43:15.000+00:00"
أو "2024-05-15 22:43:15"
.
تسمح لك خيارات DataFrameReader بإنشاء DataFrame من جدول Delta تم إصلاحه إلى إصدار أو طابع زمني معين للجدول، على سبيل المثال:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
للحصول على التفاصيل، راجع العمل مع محفوظات جدول Delta Lake.
تحسين جدول
بعد إجراء تغييرات متعددة على جدول، قد يكون لديك الكثير من الملفات الصغيرة. لتحسين سرعة قراءة الاستعلامات، يمكنك استخدام عملية التحسين لطي الملفات الصغيرة إلى ملفات أكبر:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
ترتيب Z حسب الأعمدة
لتحسين أداء القراءة بشكل أكبر، يمكنك تجميع المعلومات ذات الصلة في نفس مجموعة الملفات حسب ترتيب z. تستخدم خوارزميات تخطي بيانات Delta Lake هذا الترتيب لتقليل كمية البيانات التي تحتاج إلى قراءة بشكل كبير. ل z-order data، يمكنك تحديد الأعمدة المراد ترتيبها في ترتيب z حسب العملية. على سبيل المثال، للترشيح حسب gender
، قم بتشغيل:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
للحصول على المجموعة الكاملة من الخيارات المتوفرة عند تشغيل عملية التحسين، راجع تحسين تخطيط ملف البيانات.
تنظيف اللقطات باستخدام VACUUM
يوفر Delta Lake عزل اللقطة للقراءات، ما يعني أنه من الآمن تشغيل عملية تحسين حتى أثناء قيام المستخدمين أو الوظائف الأخرى بالاستعلام عن الجدول. ومع ذلك، في النهاية، يجب تنظيف اللقطات القديمة. يمكنك القيام بذلك عن طريق تشغيل عملية التفريغ:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
للحصول على تفاصيل حول استخدام عملية التفريغ بشكل فعال، راجع إزالة ملفات البيانات غير المستخدمة باستخدام الفراغ.