Megosztás a következőn keresztül:


Update Delta Lake tableschema

A Delta Lake segítségével update egy tableschema. A következő típusok támogatottak:

  • Új columns hozzáadása (tetszőleges pozícióban)
  • Meglévő columns átrendezése
  • Meglévő columns átnevezése

Ezeket a módosításokat explicit módon DDL használatával vagy implicit módon DML használatával is elvégezheti.

Fontos

A update-tól a Delta tableschema-ig tartó művelet olyan művelet, amely ütközik az összes egyidejű Delta írási művelettel.

Ha delta tableschemaupdate, az abból a table beolvasott streamek leállnak. Ha azt szeretné, hogy a stream folytatódjon, újra kell indítania. Az ajánlott módszerekért tekintse meg a strukturált streamelés éles környezettel kapcsolatos szempontjait.

A(z) columns hozzáadásához használja az updateschema értékeket expliciten

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Alapértelmezés szerint a nullabilitás a következő true: .

Egy column beágyazott mezőhöz való hozzáadásához használja a következőt:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Ha például a ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) futtatása előtt a schema a következő:

- root
| - colA
| - colB
| +-field1
| +-field2

a schema utáni:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Feljegyzés

Beágyazott columns hozzáadása csak a szerkezetek esetében támogatott. A tömbök és a térképek nem támogatottak.

Explicit módon updateschema módosítani column megjegyzést vagy sorrendet

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Beágyazott mezőben lévő column módosításához használja a következőt:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Ha például a ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST futtatása előtt a schema a következő:

- root
| - colA
| - colB
| +-field1
| +-field2

a schema utáni:

- root
| - colA
| - colB
| +-field2
| +-field1

Explicit módon updateschema a columns helyére

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Például a következő DDL futtatásakor:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

ha az előző schema:

- root
| - colA
| - colB
| +-field1
| +-field2

a schema utáni rész:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

explicit módon updateschema átnevezni columns

Feljegyzés

Ez a funkció a Databricks Runtime 10.4 LTS és újabb verziókban érhető el.

A columns átnevezéséhez a columnsmeglévő adatainak újraírása nélkül engedélyeznie kell a table-hoz a column leképezését. Lásd átnevezése és columns elvetése a Delta Lake column leképezésszerint.

columnátnevezése céljából:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Beágyazott mező átnevezése:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Ha például a következő parancsot futtatja:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Ha az előző schema:

- root
| - colA
| - colB
| +-field1
| +-field2

Ezt követően a következő schema:

- root
| - colA
| - colB
| +-field001
| +-field2

Lásd: átnevezése és columns elvetése a Delta Lake columnleképezéssel.

egyértelműen updateschema elvetni columns

Feljegyzés

Ez a funkció a Databricks Runtime 11.3 LTS és újabb verziókban érhető el.

Ha columns csak metaadatként szeretné kihagyni adatfájlok újraírása nélkül, engedélyeznie kell a column leképezést a table-re. Lásd: Átnevezés és columns törlés a Delta Lake column leképezés.

Fontos

Metaadatokból a column eltávolítása nem törli a fájlokban lévő column alapjául szolgáló adatokat. Az elvetett column adatok törléséhez a REORG TABLE használatával újraírhatja a fájlokat. Ezután a VACUUM használatával fizikailag törölheti az elvetett column adatokat tartalmazó fájlokat.

columnelvetése:

ALTER TABLE table_name DROP COLUMN col_name

Több columnselvetése:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Kifejezetten updateschema a column típus vagy név módosításához

Módosíthatja egy columntípusát vagy nevét, vagy elvethet egy column a tableújraírásával. Ehhez használja a overwriteSchema lehetőséget.

Az alábbi példa egy column típusának módosítását mutatja be:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Az alábbi példa bemutatja egy column név megváltoztatását.

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

schema evolúció engedélyezése

Az alábbi műveletek egyikével engedélyezheti schema evolúciót:

  • Set a .option("mergeSchema", "true") nevűt Spark DataFrame write vagy writeStream alakítja át. Lásd: Írások schema fejlődésének engedélyezése új columnshozzáadásához.
  • Szintaxis használata MERGE WITH SCHEMA EVOLUTION . Aegyesítéséhez lásd a evolúció szintaxisát.
  • Set a Spark conf spark.databricks.delta.schema.autoMerge.enabled beállítható a jelenlegi SparkSession true számára.

A Databricks azt javasolja, hogy a Spark-conf beállítása helyett engedélyezze schema evolúciót az egyes írási műveletekhez.

Ha az írási művelet schema evolúciójának engedélyezéséhez beállításokat vagy szintaxist használ, ez elsőbbséget élvez a Spark-konföderációval szemben.

Feljegyzés

Az INSERT INTO utasításokhoz nincs schema evolúciós záradék.

Az schema fejlődés engedélyezése az írások új columns számára

Columns elemek, amelyek szerepelnek a forrás lekérdezésben, de hiányoznak a cél table-ből, automatikusan hozzáadódnak egy írási tranzakció részeként, amikor a schema evolúció engedélyezve van. Lásd: engedélyezze a schema evolúciót.

Amikor új columnkerül hozzáfűzésre, a kis- és nagybetűk megmaradnak. A rendszer új columns ad hozzá a tableschemavégéhez. Ha a további columns egy struktúrában vannak, a rendszer hozzáfűzi őket a struktúra végéhez a cél table-ben.

Az alábbi példa bemutatja, hogy az automatikus betöltővel használja a mergeSchema lehetőséget. Lásd : Mi az automatikus betöltő?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Az alábbi példa azt mutatja be, hogy a mergeSchema lehetőséget kötegírási művelettel használja:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

A Delta Lake automatikus schema összevonásának evolúciója

A Schema fejlesztés lehetővé teszi a felhasználók számára, hogy feloldják a cél és a forrás table egyesítése közötti schema eltéréseket. A következő két esetet kezeli:

  1. A forrás tablecolumn nem szerepel a cél table-ben . Az új column hozzáadódik a cél schema-hez, és a values a forrás valuesalapján beszúrásra kerül vagy frissítésre kerül.
  2. A célban table lévő column nem található a forrás table-ban. A cél schema változatlan marad. A további cél column esetén a values változatlanok maradnak (UPDATEesetén), vagy setNULL (INSERTesetén).

Manuálisan kell engedélyeznie az automatikus schema evolúciót. Lásd: schema engedélyezzeevolúcióját.

Feljegyzés

A Databricks Runtime 12.2 LTS és újabb verziókban a forrás tablecolumns és strukturált mezői név alapján adhatók meg insert vagy update műveletekben. A Databricks Runtime 11.3 LTS-ben és az alábbi verziókban csak INSERT * vagy UPDATE SET * műveletek használhatók schema egyesítéssel történő fejlesztéshez.

A Databricks Runtime 13.3 LTS és újabb verziókban schema evolúciót használhat a térképekbe ágyazott szerkezetekkel, például map<int, struct<a: int, b: int>>.

Schema egyesítés evolúciós szintaxisa

A Databricks Runtime 15.2-ben és újabb verziókban SQL vagy Delta table API-k használatával megadhatja a schema evolúciót egy merge utasításban.

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Példaműveletek az schema-evolúcióval való egyesítésre

Íme néhány példa a merge művelet schema evolúcióval és anélküli hatásaira.

Columns Lekérdezés (SQL-ben) Viselkedés schema evolúció nélkül (alapértelmezett) A schema evolúcióval kapcsolatos viselkedés
Cél columns: key, value

Forrás columns: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
A tableschema változatlan marad; csak columnskey, value frissülnek/beszúrhatók. A tableschema megváltozott (key, value, new_value)-re. A meglévő egyezésekkel rendelkező rekordok frissülnek a forrásban és value a new_value forrásban. Az új sorokat a schema(key, value, new_value)segítségével szúrjuk be.
Cél columns: key, old_value

Forrás columns: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATE és INSERT műveletek hibát jeleznek, mert a cél columnold_value nem szerepel a forrásban. A tableschema(key, old_value, new_value)-re változott. A meglévő egyezésekkel rendelkező rekordok frissülnek a new_value forrásban változatlanul hagyva old_value . A rendszer új rekordokat szúr be a megadott keyés new_valueNULL a old_value.
Cél columns: key, old_value

Forrás columns: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE hibát jelez, mert columnnew_value nem létezik a cél table. A tableschema(key, old_value, new_value)-ra változik. A meglévő egyezésekkel rendelkező rekordok frissülnek a new_value forrásban változatlanul hagyva old_value , és a nem egyező rekordok be lettek NULL adva a következőhöz new_value: . Lásd az 1. megjegyzést.
Cél columns: key, old_value

Forrás columns: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT hibát jelez, mert columnnew_value nem létezik a cél table. A tableschema(key, old_value, new_value)-re változott. A rendszer új rekordokat szúr be a megadott keyés new_valueNULL a old_value. A meglévő rekordok NULL változatlanok maradnak new_valueold_value . Lásd az 1. megjegyzést.

(1) Ez a viselkedés a Databricks Runtime 12.2 LTS és újabb verziókban érhető el; Ebben a feltételben a Databricks Runtime 11.3 LTS és az alábbi hiba jelenik meg.

columns kizárása Delta Lake-egyesítéssel

A Databricks Runtime 12.2 LTS-ben és újabb verziókban EXCEPT záradékok használatával explicit módon kizárhatja columns. A EXCEPT kulcsszó viselkedése attól függően változik, hogy engedélyezve van-e schema evolúció.

Ha schema evolúció le van tiltva, a EXCEPT kulcsszó a cél tablecolumnslist vonatkozik, és lehetővé teszi columns kizárását UPDATE vagy INSERT műveletekből. columns ki van zárva a set-től null-ig.

Ha az schema evolúció engedélyezve van, a EXCEPT kulcsszó a forrás tablecolumnslist vonatkozik, és lehetővé teszi columns kizárását schema evolúcióból. A forrásban lévő új column, amely nem szerepel a célban, nem lesz hozzáadva a célhoz schema, ha szerepel a EXCEPT záradékban. Azok a kizárt columns elemek, amelyek már jelen vannak a célban, a set és nullközötti tartományban vannak.

Az alábbi példák ezt a szintaxist mutatják be:

Columns Lekérdezés (SQL-ben) Viselkedés schema evolúció nélkül (alapértelmezett) Az schema evolúcióval kapcsolatos viselkedés
Cél columns: id, title, last_updated

Forrás columns: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. Új sorokat szúr be valuesid és title. A kizárt mező last_updatednullset. A mező review figyelmen kívül lesz hagyva, mert nem szerepel a célban. A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. Schema a reviewmező hozzáadásához fejlődik. Az új sorokat a program az összes forrásmezővel beszúrja, kivéve last_updated, amely set-től null-ig terjed.
Cél columns: id, title, last_updated

Forrás columns: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT hibát jelez, mert columninternal_count nem létezik a cél table. A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. A review mező hozzáadódik a cél table-hez, de a internal_count mező figyelmen kívül marad. Az új beszúrt sorok last_updatedset-től null-ig.

NullType columns kezelése schema frissítésekben

Mivel a Parquet nem támogatja a NullTypeés NullType,columns kiesnek a DataFrame-ből a Delta tables-ba való íráskor, de a schematovábbra is tárolja őket. Ha az adott columneltérő adattípus érkezik, a Delta Lake egyesíti a schema az új adattípussal. Ha a Delta Lake meglévő column-hez kap egy NullType, a régi schema megmarad, és az új column elvetésre kerül az írás során.

NullType a streamelés nem támogatott. Mivel a streamelés használatakor set sémákat kell használnia, ennek nagyon ritkanak kell lennie. NullTypenem fogadható el olyan összetett típusok esetében is, mint például ArrayType a .MapType

table schema cseréje

Alapértelmezés szerint az adatok felülírása a table-n nem írja felül a schema-et. Amikor table felülírását végzi mode("overwrite") használatával replaceWherenélkül, akkor is érdemes felülírni a megírt adatok schema-ját. A schema cseréjét és a table particionálását úgy végezheti el, hogy a overwriteSchema beállítást trueértékre állítja.

df.write.option("overwriteSchema", "true")

Fontos

Dinamikus partition felülíráskor nem adhatja meg overwriteSchema-ként true.