Udostępnij za pośrednictwem


Selektywne zastępowanie danych za pomocą usługi Delta Lake

Usługa Azure Databricks korzysta z funkcji usługi Delta Lake, aby obsługiwać dwie różne opcje selektywnego zastępowania:

  • Opcja replaceWhere niepodzielna zastępuje wszystkie rekordy zgodne z danym predykatem.
  • Możesz zastąpić foldery danych w zależności od sposobu partycjonowania tabel, używając dynamicznego nadpisywania partycji.

W przypadku większości operacji usługa Databricks zaleca użycie polecenia replaceWhere w celu określenia, które dane mają być zastępowane.

Ważne

Jeśli dane zostały przypadkowo zastąpione, możesz użyć przywróć, aby cofnąć zmianę.

Dowolne selektywne zastępowanie za pomocą polecenia replaceWhere

Można selektywnie zastąpić tylko dane pasujące do dowolnego wyrażenia.

Uwaga

Sql wymaga środowiska Databricks Runtime 12.2 LTS lub nowszego.

Następujące polecenie niepodzielnie zastępuje zdarzenia w styczniu w tabeli docelowej, która jest partycjonowana na podstawie start_datez danymi w replace_data.

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Ten przykładowy kod zapisuje dane w replace_datapliku , sprawdza, czy wszystkie wiersze są zgodne z predykatem i wykonuje niepodzielne zastąpienie przy użyciu overwrite semantyki. Jeśli jakiekolwiek wartości operacji wykraczają poza ograniczenie, ta operacja domyślnie kończy się niepowodzeniem z powodu błędu.

To zachowanie można zmienić na overwrite wartości mieszczące się w zakresie predykatu i insert rekordy wykraczające poza określony zakres. Aby to zrobić, wyłącz sprawdzanie ograniczeń, ustawiając spark.databricks.delta.replaceWhere.constraintCheck.enabled wartość false przy użyciu jednego z następujących ustawień:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Zachowanie starszej wersji

Starsze domyślne zachowanie powodowało, że replaceWhere zastępowało dane spełniające tylko warunek w kolumnach partycji. W przypadku tego starszego modelu następujące polecenie spowoduje atomowe zastąpienie miesiąca stycznia w tabeli docelowej, która jest partycjonowana przez date, danymi z df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Jeśli chcesz wrócić do starego zachowania, możesz wyłączyć flagę spark.databricks.delta.replaceWhere.dataColumns.enabled :

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

nadpisywanie partycji dynamicznej

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Środowisko Databricks Runtime 11.3 LTS lub nowsze obsługuje tryb zastępowania partycji dynamicznej dla tabel partycjonowanych nowszych. W przypadku tabel z wieloma partycjami środowisko Databricks Runtime 11.3 LTS i poniżej obsługuje tylko zastępowanie partycji dynamicznej, jeśli wszystkie kolumny partycji mają ten sam typ danych.

W trybie zastępowania partycji dynamicznej operacje zastępują wszystkie istniejące dane w każdej partycji logicznej, dla której zapis zatwierdza nowe dane. Wszystkie istniejące partycje logiczne, dla których zapis nie zawiera danych, pozostają niezmienione. Ten tryb ma zastosowanie tylko wtedy, gdy dane są zapisywane w trybie zastępowania: INSERT OVERWRITE w języku SQL lub w obiekcie DataFrame zapisu za pomocą polecenia df.write.mode("overwrite").

Skonfiguruj tryb nadpisywania dynamicznych partycji, ustawiając konfigurację sesji Spark z spark.sql.sources.partitionOverwriteMode na dynamic. Można to również włączyć, ustawiając DataFrameWriter opcję partitionOverwriteMode na dynamic. Jeśli istnieje, opcja specyficzna dla zapytania zastępuje tryb zdefiniowany w konfiguracji sesji. Wartość domyślna to partitionOverwriteModestatic.

Ważne

Sprawdź, czy dane zapisane za pomocą dynamicznego nadpisywania partycji dotykają tylko oczekiwane partycje. Pojedynczy wiersz w nieprawidłowej partycji może prowadzić do przypadkowego nadpisania całej partycji.

W poniższym przykładzie pokazano użycie dynamicznego nadpisywania partycji.

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Uwaga

  • Partycja dynamiczna zastępuje konflikty z opcją replaceWhere dla tabel partycjonowanych.
    • Jeśli funkcja zastępowania partycji dynamicznej jest włączona w konfiguracji sesji platformy Spark, a replaceWhere jest dostępna jako opcja DataFrameWriter, usługa Delta Lake zastępuje dane zgodnie z wyrażeniem replaceWhere (opcje specyficzne dla zapytania zastępują konfiguracje sesji).
    • Otrzymasz błąd, jeśli opcje DataFrameWriter mają włączone zarówno zastąpienie partycji dynamicznej, jak i replaceWhere.
  • Nie można określić overwriteSchema jako true podczas używania zastępowania partycji dynamicznej.