Oktatóanyag: Delta Lake
Ez az oktatóanyag bemutatja az Azure Databricks gyakori Delta Lake-műveleteit, beleértve a következőket:
- Tábla létrehozása.
- Felfelé egy táblához.
- Olvasás egy táblából.
- Táblaelőzmények megjelenítése.
- Tábla egy korábbi verziójának lekérdezése.
- Tábla optimalizálása.
- Egy Z-order index hozzáadása.
- Nem hivatkozott fájlok törlése.
A jelen cikkben szereplő Python-, Scala- és SQL-kódot egy Azure Databricks számítási erőforráshoz, például fürthöz csatolt jegyzetfüzetből futtathatja. A cikkben szereplő SQL-kódot a Databricks SQL-ben lévő SQL-raktárhoztársított lekérdezésből is futtathatja.
A forrásadatok előkészítése
Ez az oktatóanyag egy People 10 M nevű adatkészletre támaszkodik. 10 millió fiktív rekordot tartalmaz, amelyek olyan tényeket tartalmaznak az emberekről, mint a vezeték- és utónevek, a születési dátum és a fizetés. Ez az oktatóanyag feltételezi, hogy ez az adatkészlet egy Unity Catalog-kötetben található, amely a cél Azure Databricks-munkaterülethez van társítva.
Ha le szeretné szerezni a People 10 M adatkészletet ehhez az oktatóanyaghoz, tegye a következőket:
- Nyissa meg a Személyek 10 M lapot Kaggleben.
- Kattintson a Letöltés gombra a helyi gépre elnevezett
archive.zip
fájl letöltéséhez. - Bontsa ki a fájlból
export.csv
elnevezettarchive.zip
fájlt. Aexport.csv
fájl tartalmazza az oktatóanyag adatait.
A fájl kötetbe való feltöltéséhez export.csv
tegye a következőket:
- Az oldalsávon kattintson a Katalógus elemre.
- A Katalóguskezelőben keresse meg és nyissa meg azt a kötetet, ahová fel szeretné tölteni a
export.csv
fájlt. - Kattintson a Feltöltés erre a kötetre elemre.
- Húzza a fájlt a helyi gépen, vagy keresse meg és jelölje ki
export.csv
. - Kattintson a Feltöltés gombra.
Az alábbi példakódban cserélje le /Volumes/main/default/my-volume/export.csv
a export.csv
célkötetben lévő fájl elérési útját.
Tábla létrehozása
Az Azure Databricksben létrehozott összes tábla alapértelmezés szerint a Delta Lake-t használja. A Databricks a Unity Catalog által felügyelt táblák használatát javasolja.
Az előző kód példában és az alábbi példakódokban cserélje le a tábla nevét main.default.people_10m
a cél háromrészes katalógusra, sémára és táblanévre a Unity Catalogban.
Feljegyzés
A Delta Lake az azure Databricks összes olvasási, írási és táblázat-létrehozási parancsának alapértelmezett értéke.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Az előző műveletek létrehoznak egy új felügyelt táblát. A Delta-tábla létrehozásakor elérhető lehetőségekről további információt a CREATE TABLE-ben talál.
A Databricks Runtime 13.3 LTS-ben és újabb verziókban a CREATE TABLE LIKE használatával létrehozhat egy új üres Delta-táblát, amely duplikálja a forrás Delta-tábla sémáját és táblázattulajdonságait. Ez különösen hasznos lehet a táblák fejlesztési környezetből éles környezetbe való előléptetéséhez, ahogyan az alábbi kód példában is látható:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Üres tábla létrehozásához használhatja az API-t a Pythonhoz és a DeltaTableBuilder
Scalához készült Delta Lake-ben is. Az egyenértékű DataFrameWriter API-khoz képest ezek az API-k megkönnyítik a további információk megadását, például az oszlopbejegyzéseket, a táblatulajdonságokat és a létrehozott oszlopokat.
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert to a table
Ha frissítéseket és beszúrásokat szeretne egy meglévő Delta-táblába egyesíteni, használja a DeltaTable.merge
metódust a Python és Scala, valamint az SQL MERGE INTO utasításához. Az alábbi példa például adatokat vesz fel a forrástáblából, és egyesíti őket a cél Delta-táblával. Ha mindkét táblában egyező sor található, a Delta Lake a megadott kifejezéssel frissíti az adatoszlopot. Ha nincs egyező sor, a Delta Lake új sort ad hozzá. Ez a művelet upsert néven ismert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Ha megadja az SQL-t *
, ez frissíti vagy beszúrja a céltábla összes oszlopát, feltéve, hogy a forrástábla oszlopai megegyeznek a céltáblával. Ha a céltábla nem rendelkezik ugyanazokkal az oszlopokkal, a lekérdezés elemzési hibát jelez.
Beszúrási művelet végrehajtásakor meg kell adnia egy értéket a táblázat minden oszlopához (például ha a meglévő adathalmazban nincs egyező sor). Azonban nem kell frissítenie az összes értéket.
Az eredmények megtekintéséhez kérdezze le a táblát.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Táblázat olvasása
A Delta-táblák adatait a táblanév vagy a tábla elérési útja alapján érheti el, ahogyan az alábbi példákban is látható:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Írás táblába
A Delta Lake szabványos szintaxist használ az adatok táblákba való írásához.
Ha új adatokat szeretne atomilag hozzáadni egy meglévő Delta-táblához, használja a hozzáfűzési módot az alábbi példákban látható módon:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
A tábla összes adatának cseréjéhez használja a felülírási módot az alábbi példákhoz hasonlóan:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Tábla frissítése
A Delta-táblák predikátumának megfelelő adatokat frissítheti. A példatáblában people_10m
például a következőt futtathatja az oszlop gender
M
rövidítésének F
módosításához Male
Female
:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Törlés táblából
Egy predikátumnak megfelelő adatokat eltávolíthat egy Delta-táblából. Ha például a példatáblában people_10m
törölni szeretné az oszlopban birthDate
1955
lévő értékekkel rendelkező személyeknek megfelelő összes sort, futtassa a következőt:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Fontos
A törlés eltávolítja az adatokat a Delta-tábla legújabb verziójából, de nem távolítja el azokat a fizikai tárolóból, amíg a régi verziók explicit módon ki nem porszívózódnak. Részletekért lásd a vákuumot .
Táblaelőzmények megjelenítése
Egy tábla előzményeinek megtekintéséhez használja a DeltaTable.history
metódust Python és Scala, valamint a DESCRIBE HISTORY utasítást az SQL-ben, amely megadja a származási információkat, beleértve a tábla verzióját, műveletét, felhasználóját stb., minden egyes írásnál a táblába.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
A tábla egy korábbi verziójának lekérdezése (időutazás)
A Delta Lake időutazással lekérdezheti egy Delta-tábla régebbi pillanatképét.
A tábla régebbi verziójának lekérdezéséhez adja meg a tábla verzióját vagy időbélyegét. Ha például le szeretné kérdezni a 0-s verziót vagy az időbélyeget 2024-05-15T22:43:15.000+00:00Z
az előző előzményekből, használja a következőket:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Időbélyegek esetén a rendszer csak dátum- vagy időbélyeg-sztringeket fogad el, például "2024-05-15T22:43:15.000+00:00"
"2024-05-15 22:43:15"
.
A DataFrameReader beállításaival olyan DataFrame-et hozhat létre egy Delta-táblából, amely a tábla egy adott verziójára vagy időbélyegére van rögzítve, például:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
További részletekért lásd : A Delta Lake táblaelőzményeinek ismertetése.
Táblázat optimalizálása
Miután több módosítást végzett egy táblán, sok kis fájllal rendelkezhet. Az olvasási lekérdezések sebességének javítása érdekében az optimalizálási művelettel összecsukhatja a kis fájlokat nagyobbakra:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Z-sorrend oszlopok szerint
Az olvasási teljesítmény további javítása érdekében a kapcsolódó információkat z-rendezéssel csoportosíthatja ugyanabban a fájlkészletben. A Delta Lake adat-kihagyási algoritmusai ezzel a rendezéssel jelentősen csökkentik az olvasandó adatok mennyiségét. A z-order adatokhoz meg kell adnia a z-sorrendben művelet szerint rendezendő oszlopokat. Például a rendezéshez futtassa a következőt gender
:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Az optimalizálási művelet futtatásakor elérhető összes beállításért tekintse meg az adatfájl-elrendezés optimalizálása című témakört.
Pillanatképek törlése a VACUUM
A Delta Lake pillanatkép-elkülönítést biztosít az olvasásokhoz, ami azt jelenti, hogy biztonságosan futtathat optimalizálási műveletet, még akkor is, ha más felhasználók vagy feladatok kérdezik le a táblát. Végül azonban törölnie kell a régi pillanatképeket. Ezt a vákuumművelet futtatásával teheti meg:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
A vákuumművelet hatékony használatáról további információt a nem használt adatfájlok eltávolítása vákuummal című témakörben talál.