Megosztás a következőn keresztül:


Adatok szelektív felülírása a Delta Lake használatával

Az Azure Databricks a Delta Lake funkcióit használja a szelektív felülírások két különböző lehetőségének támogatásához:

  • A replaceWhere beállítás atomilag lecseréli az adott predikátumnak megfelelő összes rekordot.
  • Az adatkönyvtárakat a tables partíciója alapján dinamikus partition felülírásokkal cserélheti le.

A legtöbb művelet esetében a Databricks replaceWhere azt javasolja, hogy adja meg, mely adatokat írja felül.

Fontos

Ha véletlenül felülírták az adatokat, restore használatával visszavonhatja a módosítást.

Tetszőleges szelektív felülírás a replaceWhere

Csak az tetszőleges kifejezésnek megfelelő adatokat írhatja felül szelektíven.

Feljegyzés

Az SQL-hez a Databricks Runtime 12.2 LTS vagy újabb verziója szükséges.

A következő parancs atomilag lecseréli a start_dateáltal particionált cél tablejanuári eseményeit a replace_dataadataira:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Ez a mintakód beírja az adatokat replace_data, ellenőrzi, hogy az összes sor megfelel-e a predikátumnak, és szemantikával overwrite végez-e atomi pótlást. Ha a values bármely eleme kívül esik a constrainthatárain, ez a művelet alapértelmezés szerint hibával meghiúsul.

Ezt a viselkedést módosíthatja úgy, hogy a overwriteésvalues rekordokat a predikátumtartományon belül szelektálja, valamint a insert rekordokat, amelyek a megadott tartományon kívül esnek. Ehhez tiltsa le a constraint-ellenőrzést úgy, hogy a spark.databricks.delta.replaceWhere.constraintCheck.enabled hamis értékre állítja az alábbi beállítások egyikével:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Örökölt viselkedés

A régi alapértelmezett viselkedés során a replaceWhere csak a predikátumnak megfelelő adatokat írta felül partitioncolumns-n. Ezzel az örökölt modellel a következő parancs atomilag lecseréli a januári hónapot a dateáltal particionált cél tablea dfadataira:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Ha vissza szeretne esni a régi viselkedésre, letilthatja a jelzőt spark.databricks.delta.replaceWhere.dataColumns.enabled :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

dinamikus partition felülírása

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

A Databricks Runtime 11.3 LTS és az újabb verziók támogatják a dinamikuspartition felülírási módot a particionált tablesesetén. A több partícióval rendelkező tables esetében a Databricks Runtime 11.3 LTS és a korábbi verziók csak akkor támogatják a dinamikus partition felülírást, ha az összes partitioncolumns azonos adattípusú.

Dinamikus partition felülírási módban a műveletek felülírják az összes meglévő adatot minden olyan logikai partition-ben, amelybe az írás új adatokat rögzít. A meglévő logikai partíciók, amelyek írása nem tartalmaz adatokat, változatlanok maradnak. Ez a mód csak akkor alkalmazható, ha az adatok felülírási módban vannak megírva: INSERT OVERWRITE SQL-ben vagy DataFrame-írással df.write.mode("overwrite").

Dinamikus partition felülírási mód konfigurálása a Spark munkamenet spark.sql.sources.partitionOverwriteMode konfigurációjának dynamicbeállításával. Ezt a beállítást a beállítás DataFrameWriterpartitionOverwriteModebeállításával dynamic is engedélyezheti. Ha van ilyen, a lekérdezésspecifikus beállítás felülírja a munkamenet-konfigurációban definiált módot. Az alapértelmezett érték a partitionOverwriteMode következő static: .

Fontos

Győződjön meg arról, hogy a dinamikus partition-val írt adatok csak a várt partíciókat érintik. A helytelen partition egyetlen sora véletlenül a teljes partitionfelülírásához vezethet.

Az alábbi példa a dinamikus partition felülírások használatát mutatja be:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Feljegyzés

  • A dinamikus partition felülírja az ütközéseket a particionált tablesreplaceWhere lehetőséggel.
    • Ha a Spark-munkamenet konfigurációjában engedélyezve van a dinamikus partition felülírás, és replaceWhereDataFrameWriter lehetőségként van megadva, akkor a Delta Lake felülírja az adatokat a replaceWhere kifejezés szerint (a lekérdezésspecifikus beállítások felülbírálják a munkamenet-konfigurációkat).
    • Hibaüzenet jelenik meg, ha a DataFrameWriter beállításainak dinamikus partition felülírása és replaceWhere engedélyezve van.
  • Dinamikus partition felülíráskor nem adhatja meg overwriteSchema-ként true-et.