Öğretici: Delta Lake
Bu öğretici, aşağıdakiler de dahil olmak üzere Azure Databricks'te yaygın olarak kullanılan Delta Lake işlemlerini tanıtır:
- Bir tablo oluşturun.
- Bir tabloya güncelleştirin/ekleyin.
- Bir tablodan okuyun.
- Tablo geçmişini görüntüleyin.
- Tablonun önceki bir sürümünü sorgulayın.
- Tabloyu iyileştirin.
- Bir Z sırası dizini ekleyin.
- Başvurulmayan dosyaları vakumlayın.
Bu makaledeki Python, Scala ve SQL kodu örneğini, küme gibi bir Azure Databricks işlem kaynağına bağlı bir not defterinin içinden çalıştırabilirsiniz. Bu makaledeki SQL kodunu Databricks SQL'deki bir SQL ambarıile ilişkilendirilmiş bir sorgunun içinden de çalıştırabilirsiniz.
Kaynak verileri hazırlama
Bu öğretici, Kişiler 10 M adlı bir veri kümesini temel alır. Ad ve soyadı, doğum tarihi ve maaş gibi insanlar hakkında gerçekler barındıran 10 milyon kurgusal kayıt içerir. Bu öğreticide, bu veri kümesinin hedef Azure Databricks çalışma alanınızla ilişkili bir Unity Kataloğu biriminde olduğu varsayılır.
Bu öğreticide Kişiler 10 M veri kümesini almak için aşağıdakileri yapın:
- Kaggle'da Kişiler 10 M sayfasına gidin.
- Adlı dosyayı yerel makinenize indirmek için İndir'e
archive.zip
. - adlı
export.csv
dosyayı dosyasındanarchive.zip
ayıklayın. Dosya buexport.csv
öğreticinin verilerini içerir.
Dosyayı birime yüklemek export.csv
için aşağıdakileri yapın:
- Kenar çubuğunda Katalog'a tıklayın.
- Katalog Gezgini'nde, dosyayı karşıya yüklemek
export.csv
istediğiniz birimi bulun ve açın. - Bu birime yükle'ye tıklayın.
- Yerel makinenizdeki dosyayı sürükleyip bırakın veya dosyaya
export.csv
göz atın ve seçin. - Karşıya Yükle'ye tıklayın.
Aşağıdaki kod örneklerinde öğesini hedef biriminizdeki dosyanın yoluyla /Volumes/main/default/my-volume/export.csv
değiştirinexport.csv
.
Tablo oluşturma
Azure Databricks'te oluşturulan tüm tablolar varsayılan olarak Delta Lake kullanır. Databricks, Unity Kataloğu yönetilen tablolarının kullanılmasını önerir.
Önceki kod örneğinde ve aşağıdaki kod örneklerinde, Unity Kataloğu'nda tablo adını main.default.people_10m
hedef üç bölümlü kataloğunuz, şemanız ve tablo adınızla değiştirin.
Not
Delta Lake, Azure Databricks'in tüm okuma, yazma ve tablo oluşturma komutları için varsayılandır.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Önceki işlemler yeni bir yönetilen tablo oluşturur. Delta tablosu oluştururken kullanılabilen seçenekler hakkında bilgi için bkz. CREATE TABLE.
Databricks Runtime 13.3 LTS ve üzeri sürümlerde, CREATE TABLE LIKE kullanarak kaynak Delta tablosunun şema ve tablo özelliklerini çoğaltan yeni bir boş Delta tablosu oluşturabilirsiniz. Bu, aşağıdaki kod örneğinde gösterildiği gibi tabloları geliştirme ortamından üretime yükseltme sırasında özellikle yararlı olabilir:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Boş bir tablo oluşturmak için Python ve Scala için Delta Lake'teki API'yi de kullanabilirsinizDeltaTableBuilder
. Eşdeğer DataFrameWriter API'leriyle karşılaştırıldığında, bu API'ler sütun açıklamaları, tablo özellikleri ve oluşturulan sütunlar gibi ek bilgileri belirtmeyi kolaylaştırır.
Önemli
Bu özellik Genel Önizlemededir.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Tabloya upsert
Bir dizi güncelleştirme ve eklemeyi var olan bir Delta tablosuyla birleştirmek için Python ve
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
SQL'de, belirtirseniz *
, kaynak tablonun hedef tabloyla aynı sütunlara sahip olduğu varsayılarak, bu hedef tablodaki tüm sütunları güncelleştirir veya ekler. Hedef tabloda aynı sütunlar yoksa sorgu bir çözümleme hatası oluşturur.
Ekleme işlemi gerçekleştirirken (örneğin, mevcut veri kümesinde eşleşen satır olmadığında) tablonuzdaki her sütun için bir değer belirtmeniz gerekir. Ancak, tüm değerleri güncelleştirmeniz gerekmez.
Sonuçları görmek için tabloyu sorgula.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Tablo okuma
Delta tablolarındaki verilere, aşağıdaki örneklerde gösterildiği gibi tablo adı veya tablo yolu ile erişebilirsiniz:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Tabloya yazma
Delta Lake, tablolara veri yazmak için standart söz dizimi kullanır.
Mevcut Delta tablosuna atomik olarak yeni veri eklemek için aşağıdaki örneklerde gösterildiği gibi ekleme modunu kullanın:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Bir tablodaki tüm verileri değiştirmek için, aşağıdaki örneklerde olduğu gibi üzerine yazma modunu kullanın:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Tabloyu güncelleştirme
Delta tablosundaki bir koşulla eşleşen verileri güncelleştirebilirsiniz. Örneğin, örnek people_10m
tabloda, sütundaki gender
bir kısaltmayı M
veya F
olarak değiştirmek için Male
Female
aşağıdakileri çalıştırabilirsiniz:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Tablodan silme
Delta tablosundan koşulla eşleşen verileri kaldırabilirsiniz. Örneğin, örnek people_10m
tabloda, sütununda değeri olan kişilere karşılık gelen tüm satırları öncesinden birthDate
1955
silmek için aşağıdakileri çalıştırabilirsiniz:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Önemli
Silme işlemi Delta tablosunun en son sürümündeki verileri kaldırır ancak eski sürümler açıkça vakumlanana kadar fiziksel depolama alanından kaldırmaz. Ayrıntılar için bkz . vakum .
Tablo geçmişini görüntüleme
Tablonun geçmişini görüntülemek için, DeltaTable.history
ve Scalaiçin yöntemini ve sql'de tablo sürümü, işlem, kullanıcı vb. dahil olmak üzere bir tabloya her yazma işlemi için kanıtlanmış bilgiler sağlayan DESCRIBE HISTORY deyimini kullanırsınız.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Tablonun önceki bir sürümünü sorgulama (zaman yolculuğu)
Delta Lake zaman yolculuğu, Delta tablosunun eski bir anlık görüntüsünü sorgulamanıza olanak tanır.
Tablonun eski bir sürümünü sorgulamak için tablonun sürümünü veya zaman damgasını belirtin. Örneğin, önceki geçmişe ait sürüm 0 veya zaman damgasını 2024-05-15T22:43:15.000+00:00Z
sorgulamak için aşağıdakileri kullanın:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Zaman damgaları için, veya gibi "2024-05-15T22:43:15.000+00:00"
"2024-05-15 22:43:15"
yalnızca tarih veya zaman damgası dizeleri kabul edilir.
DataFrameReader seçenekleri, bir Delta tablosundan, tablonun belirli bir sürümüne veya zaman damgasına sabit bir DataFrame oluşturmanıza olanak sağlar, örneğin:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Ayrıntılar için bkz . Delta Lake tablo geçmişiyle çalışma.
Tabloyu iyileştirme
Tabloda birden çok değişiklik yaptıktan sonra çok sayıda küçük dosyanız olabilir. Okuma sorgularının hızını artırmak için iyileştirme işlemini kullanarak küçük dosyaları daha büyük dosyalara daraltabilirsiniz:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Sütunlara göre Z sırası
Okuma performansını daha da geliştirmek için, z-ordering ile aynı dosya kümesindeki ilgili bilgileri birlikte kullanabilirsiniz. Delta Lake veri atlama algoritmaları, okunması gereken veri miktarını önemli ölçüde azaltmak için bu birlikte bulundurmayı kullanır. Verileri z sıralamak için, z sırasına göre işlemde sıralanması gereken sütunları belirtirsiniz. Örneğin, ile gender
birlikte kullanmak için şunu çalıştırın:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
İyileştirme işlemi çalıştırılırken kullanılabilen tüm seçenekler için bkz . Veri dosyası düzenini iyileştirme.
Ile anlık görüntüleri temizleme VACUUM
Delta Lake, okumalar için anlık görüntü yalıtımı sağlar; başka bir deyişle, diğer kullanıcılar veya işler tabloyu sorgularken bile iyileştirme işlemini çalıştırmanın güvenli olduğu anlamına gelir. Ancak sonunda eski anlık görüntüleri temizlemeniz gerekir. Vakum işlemini çalıştırarak bunu yapabilirsiniz:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Vakum işlemini etkili bir şekilde kullanma hakkında ayrıntılı bilgi için bkz . Kullanılmayan veri dosyalarını vakumla kaldırma.