Adatok szelektív felülírása a Delta Lake használatával
Az Azure Databricks a Delta Lake funkcióit használja a szelektív felülírások két különböző lehetőségének támogatásához:
- A
replaceWhere
beállítás atomilag lecseréli az adott predikátumnak megfelelő összes rekordot. - Az adatkönyvtárakat a tables partíciója alapján dinamikus partition felülírásokkal cserélheti le.
A legtöbb művelet esetében a Databricks replaceWhere
azt javasolja, hogy adja meg, mely adatokat írja felül.
Fontos
Ha véletlenül felülírták az adatokat, restore használatával visszavonhatja a módosítást.
Tetszőleges szelektív felülírás a replaceWhere
Csak az tetszőleges kifejezésnek megfelelő adatokat írhatja felül szelektíven.
Feljegyzés
Az SQL-hez a Databricks Runtime 12.2 LTS vagy újabb verziója szükséges.
A következő parancs atomilag lecseréli a start_date
által particionált cél tablejanuári eseményeit a replace_data
adataira:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Ez a mintakód beírja az adatokat replace_data
, ellenőrzi, hogy az összes sor megfelel-e a predikátumnak, és szemantikával overwrite
végez-e atomi pótlást. Ha a values bármely eleme kívül esik a constrainthatárain, ez a művelet alapértelmezés szerint hibával meghiúsul.
Ezt a viselkedést módosíthatja úgy, hogy a overwrite
ésvalues rekordokat a predikátumtartományon belül szelektálja, valamint a insert
rekordokat, amelyek a megadott tartományon kívül esnek. Ehhez tiltsa le a constraint-ellenőrzést úgy, hogy a spark.databricks.delta.replaceWhere.constraintCheck.enabled
hamis értékre állítja az alábbi beállítások egyikével:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Örökölt viselkedés
A régi alapértelmezett viselkedés során a replaceWhere
csak a predikátumnak megfelelő adatokat írta felül partitioncolumns-n. Ezzel az örökölt modellel a következő parancs atomilag lecseréli a januári hónapot a date
által particionált cél tablea df
adataira:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Ha vissza szeretne esni a régi viselkedésre, letilthatja a jelzőt spark.databricks.delta.replaceWhere.dataColumns.enabled
:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
dinamikus partition felülírása
Fontos
Ez a funkció a nyilvános előzetes verzióban érhető el.
A Databricks Runtime 11.3 LTS és az újabb verziók támogatják a dinamikuspartition felülírási módot a particionált tablesesetén. A több partícióval rendelkező tables esetében a Databricks Runtime 11.3 LTS és a korábbi verziók csak akkor támogatják a dinamikus partition felülírást, ha az összes partitioncolumns azonos adattípusú.
Dinamikus partition felülírási módban a műveletek felülírják az összes meglévő adatot minden olyan logikai partition-ben, amelybe az írás új adatokat rögzít. A meglévő logikai partíciók, amelyek írása nem tartalmaz adatokat, változatlanok maradnak. Ez a mód csak akkor alkalmazható, ha az adatok felülírási módban vannak megírva: INSERT OVERWRITE
SQL-ben vagy DataFrame-írással df.write.mode("overwrite")
.
Dinamikus partition felülírási mód konfigurálása a Spark munkamenet spark.sql.sources.partitionOverwriteMode
konfigurációjának dynamic
beállításával. Ezt a beállítást a beállítás DataFrameWriter
partitionOverwriteMode
beállításával dynamic
is engedélyezheti. Ha van ilyen, a lekérdezésspecifikus beállítás felülírja a munkamenet-konfigurációban definiált módot. Az alapértelmezett érték a partitionOverwriteMode
következő static
: .
Fontos
Győződjön meg arról, hogy a dinamikus partition-val írt adatok csak a várt partíciókat érintik. A helytelen partition egyetlen sora véletlenül a teljes partitionfelülírásához vezethet.
Az alábbi példa a dinamikus partition felülírások használatát mutatja be:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Feljegyzés
- A dinamikus partition felülírja az ütközéseket a particionált tables
replaceWhere
lehetőséggel.- Ha a Spark-munkamenet konfigurációjában engedélyezve van a dinamikus partition felülírás, és
replaceWhere
DataFrameWriter
lehetőségként van megadva, akkor a Delta Lake felülírja az adatokat areplaceWhere
kifejezés szerint (a lekérdezésspecifikus beállítások felülbírálják a munkamenet-konfigurációkat). - Hibaüzenet jelenik meg, ha a
DataFrameWriter
beállításainak dinamikus partition felülírása ésreplaceWhere
engedélyezve van.
- Ha a Spark-munkamenet konfigurációjában engedélyezve van a dinamikus partition felülírás, és
- Dinamikus partition felülíráskor nem adhatja meg
overwriteSchema
-kénttrue
-et.