Update Delta Lake tableschema
A Delta Lake segítségével update egy tableschema. A következő típusok támogatottak:
- Új columns hozzáadása (tetszőleges pozícióban)
- Meglévő columns átrendezése
- Meglévő columns átnevezése
Ezeket a módosításokat explicit módon DDL használatával vagy implicit módon DML használatával is elvégezheti.
Fontos
A update-tól a Delta tableschema-ig tartó művelet olyan művelet, amely ütközik az összes egyidejű Delta írási művelettel.
Ha delta tableschemaupdate, az abból a table beolvasott streamek leállnak. Ha azt szeretné, hogy a stream folytatódjon, újra kell indítania. Az ajánlott módszerekért tekintse meg a strukturált streamelés éles környezettel kapcsolatos szempontjait.
A(z) columns hozzáadásához használja az updateschema értékeket expliciten
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Alapértelmezés szerint a nullabilitás a következő true
: .
Egy column beágyazott mezőhöz való hozzáadásához használja a következőt:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Ha például a ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
futtatása előtt a schema a következő:
- root
| - colA
| - colB
| +-field1
| +-field2
a schema utáni:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Feljegyzés
Beágyazott columns hozzáadása csak a szerkezetek esetében támogatott. A tömbök és a térképek nem támogatottak.
Explicit módon updateschema módosítani column megjegyzést vagy sorrendet
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Beágyazott mezőben lévő column módosításához használja a következőt:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Ha például a ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
futtatása előtt a schema a következő:
- root
| - colA
| - colB
| +-field1
| +-field2
a schema utáni:
- root
| - colA
| - colB
| +-field2
| +-field1
Explicit módon updateschema a columns helyére
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Például a következő DDL futtatásakor:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
ha az előző schema:
- root
| - colA
| - colB
| +-field1
| +-field2
a schema utáni rész:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
explicit módon updateschema átnevezni columns
Feljegyzés
Ez a funkció a Databricks Runtime 10.4 LTS és újabb verziókban érhető el.
A columns átnevezéséhez a columnsmeglévő adatainak újraírása nélkül engedélyeznie kell a table-hoz a column leképezését. Lásd átnevezése és columns elvetése a Delta Lake column leképezésszerint.
columnátnevezése céljából:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Beágyazott mező átnevezése:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Ha például a következő parancsot futtatja:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Ha az előző schema:
- root
| - colA
| - colB
| +-field1
| +-field2
Ezt követően a következő schema:
- root
| - colA
| - colB
| +-field001
| +-field2
Lásd: átnevezése és columns elvetése a Delta Lake columnleképezéssel.
egyértelműen updateschema elvetni columns
Feljegyzés
Ez a funkció a Databricks Runtime 11.3 LTS és újabb verziókban érhető el.
Ha columns csak metaadatként szeretné kihagyni adatfájlok újraírása nélkül, engedélyeznie kell a column leképezést a table-re. Lásd: Átnevezés és columns törlés a Delta Lake column leképezés.
Fontos
Metaadatokból a column eltávolítása nem törli a fájlokban lévő column alapjául szolgáló adatokat. Az elvetett column adatok törléséhez a REORG TABLE használatával újraírhatja a fájlokat. Ezután a VACUUM használatával fizikailag törölheti az elvetett column adatokat tartalmazó fájlokat.
columnelvetése:
ALTER TABLE table_name DROP COLUMN col_name
Több columnselvetése:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Kifejezetten updateschema a column típus vagy név módosításához
Módosíthatja egy columntípusát vagy nevét, vagy elvethet egy column a tableújraírásával. Ehhez használja a overwriteSchema
lehetőséget.
Az alábbi példa egy column típusának módosítását mutatja be:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Az alábbi példa bemutatja egy column név megváltoztatását.
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
schema evolúció engedélyezése
Az alábbi műveletek egyikével engedélyezheti schema evolúciót:
-
Set a
.option("mergeSchema", "true")
nevűt Spark DataFramewrite
vagywriteStream
alakítja át. Lásd: Írások schema fejlődésének engedélyezése új columnshozzáadásához. - Szintaxis használata
MERGE WITH SCHEMA EVOLUTION
. Aegyesítéséhez lásd aevolúció szintaxisát. -
Set a Spark conf
spark.databricks.delta.schema.autoMerge.enabled
beállítható a jelenlegi SparkSessiontrue
számára.
A Databricks azt javasolja, hogy a Spark-conf beállítása helyett engedélyezze schema evolúciót az egyes írási műveletekhez.
Ha az írási művelet schema evolúciójának engedélyezéséhez beállításokat vagy szintaxist használ, ez elsőbbséget élvez a Spark-konföderációval szemben.
Feljegyzés
Az INSERT INTO
utasításokhoz nincs schema evolúciós záradék.
Az schema fejlődés engedélyezése az írások új columns számára
Columns elemek, amelyek szerepelnek a forrás lekérdezésben, de hiányoznak a cél table-ből, automatikusan hozzáadódnak egy írási tranzakció részeként, amikor a schema evolúció engedélyezve van. Lásd: engedélyezze a schema evolúciót.
Amikor új columnkerül hozzáfűzésre, a kis- és nagybetűk megmaradnak. A rendszer új columns ad hozzá a tableschemavégéhez. Ha a további columns egy struktúrában vannak, a rendszer hozzáfűzi őket a struktúra végéhez a cél table-ben.
Az alábbi példa bemutatja, hogy az automatikus betöltővel használja a mergeSchema
lehetőséget. Lásd : Mi az automatikus betöltő?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Az alábbi példa azt mutatja be, hogy a mergeSchema
lehetőséget kötegírási művelettel használja:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
A Delta Lake automatikus schema összevonásának evolúciója
A Schema fejlesztés lehetővé teszi a felhasználók számára, hogy feloldják a cél és a forrás table egyesítése közötti schema eltéréseket. A következő két esetet kezeli:
- A forrás tablecolumn nem szerepel a cél table-ben . Az új column hozzáadódik a cél schema-hez, és a values a forrás valuesalapján beszúrásra kerül vagy frissítésre kerül.
- A célban table lévő column nem található a forrás table-ban. A cél schema változatlan marad. A további cél column esetén a values változatlanok maradnak (
UPDATE
esetén), vagy setNULL
(INSERT
esetén).
Manuálisan kell engedélyeznie az automatikus schema evolúciót. Lásd: schema engedélyezzeevolúcióját.
Feljegyzés
A Databricks Runtime 12.2 LTS és újabb verziókban a forrás tablecolumns és strukturált mezői név alapján adhatók meg insert vagy update műveletekben. A Databricks Runtime 11.3 LTS-ben és az alábbi verziókban csak INSERT *
vagy UPDATE SET *
műveletek használhatók schema egyesítéssel történő fejlesztéshez.
A Databricks Runtime 13.3 LTS és újabb verziókban schema evolúciót használhat a térképekbe ágyazott szerkezetekkel, például map<int, struct<a: int, b: int>>
.
Schema egyesítés evolúciós szintaxisa
A Databricks Runtime 15.2-ben és újabb verziókban SQL vagy Delta table API-k használatával megadhatja a schema evolúciót egy merge utasításban.
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Példaműveletek az schema-evolúcióval való egyesítésre
Íme néhány példa a merge
művelet schema evolúcióval és anélküli hatásaira.
(1) Ez a viselkedés a Databricks Runtime 12.2 LTS és újabb verziókban érhető el; Ebben a feltételben a Databricks Runtime 11.3 LTS és az alábbi hiba jelenik meg.
columns kizárása Delta Lake-egyesítéssel
A Databricks Runtime 12.2 LTS-ben és újabb verziókban EXCEPT
záradékok használatával explicit módon kizárhatja columns. A EXCEPT
kulcsszó viselkedése attól függően változik, hogy engedélyezve van-e schema evolúció.
Ha schema evolúció le van tiltva, a EXCEPT
kulcsszó a cél tablecolumnslist vonatkozik, és lehetővé teszi columns kizárását UPDATE
vagy INSERT
műveletekből.
columns ki van zárva a set-től null
-ig.
Ha az schema evolúció engedélyezve van, a EXCEPT
kulcsszó a forrás tablecolumnslist vonatkozik, és lehetővé teszi columns kizárását schema evolúcióból. A forrásban lévő új column, amely nem szerepel a célban, nem lesz hozzáadva a célhoz schema, ha szerepel a EXCEPT
záradékban. Azok a kizárt columns elemek, amelyek már jelen vannak a célban, a set és null
közötti tartományban vannak.
Az alábbi példák ezt a szintaxist mutatják be:
Columns | Lekérdezés (SQL-ben) | Viselkedés schema evolúció nélkül (alapértelmezett) | Az schema evolúcióval kapcsolatos viselkedés |
---|---|---|---|
Cél columns: id, title, last_updated Forrás columns: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. Új sorokat szúr be valuesid és title . A kizárt mező last_updated null set. A mező review figyelmen kívül lesz hagyva, mert nem szerepel a célban. |
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja.
Schema a review mező hozzáadásához fejlődik. Az új sorokat a program az összes forrásmezővel beszúrja, kivéve last_updated , amely set-től null -ig terjed. |
Cél columns: id, title, last_updated Forrás columns: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT hibát jelez, mert columninternal_count nem létezik a cél table. |
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. A review mező hozzáadódik a cél table-hez, de a internal_count mező figyelmen kívül marad. Az új beszúrt sorok last_updated set-től null -ig. |
NullType
columns kezelése schema frissítésekben
Mivel a Parquet nem támogatja a NullType
és NullType
,columns kiesnek a DataFrame-ből a Delta tables-ba való íráskor, de a schematovábbra is tárolja őket. Ha az adott columneltérő adattípus érkezik, a Delta Lake egyesíti a schema az új adattípussal. Ha a Delta Lake meglévő column-hez kap egy NullType
, a régi schema megmarad, és az új column elvetésre kerül az írás során.
NullType
a streamelés nem támogatott. Mivel a streamelés használatakor set sémákat kell használnia, ennek nagyon ritkanak kell lennie.
NullType
nem fogadható el olyan összetett típusok esetében is, mint például ArrayType
a .MapType
table schema cseréje
Alapértelmezés szerint az adatok felülírása a table-n nem írja felül a schema-et. Amikor table felülírását végzi mode("overwrite")
használatával replaceWhere
nélkül, akkor is érdemes felülírni a megírt adatok schema-ját. A schema cseréjét és a table particionálását úgy végezheti el, hogy a overwriteSchema
beállítást true
értékre állítja.
df.write.option("overwriteSchema", "true")
Fontos
Dinamikus partition felülíráskor nem adhatja meg overwriteSchema
-ként true
.