Bekerja dengan riwayat tabel Delta Lake
Setiap operasi yang memodifikasi tabel Delta Lake membuat versi tabel baru. Anda dapat menggunakan informasi riwayat untuk mengaudit operasi, memutar kembali tabel, atau mengkueri tabel pada titik waktu tertentu menggunakan perjalanan waktu.
Catatan
Databricks tidak merekomendasikan penggunaan riwayat tabel Delta Lake sebagai solusi pencadangan jangka panjang untuk pengarsipan data. Databricks merekomendasikan penggunaan hanya 7 hari terakhir untuk operasi perjalanan waktu kecuali Anda telah mengatur konfigurasi retensi data dan log ke nilai yang lebih besar.
Mengambil riwayat tabel Delta
Anda dapat mengambil informasi termasuk operasi, pengguna, dan tanda waktu untuk setiap penulisan ke tabel Delta dengan menjalankan history
perintah . Operasi dikembalikan dalam urutan kronologis terbalik.
Retensi riwayat tabel ditentukan oleh pengaturan delta.logRetentionDuration
tabel , yaitu 30 hari secara default.
Catatan
Perjalanan waktu dan riwayat tabel dikendalikan oleh ambang batas retensi yang berbeda. Lihat Apa itu perjalanan waktu Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Untuk detail sintaks Spark SQL, lihat DESCRIBE HISTORY.
Lihat dokumentasi Delta Lake API untuk detail sintaks Scala/Java/Python.
Catalog Explorer menyediakan tampilan visual informasi dan riwayat tabel terperinci ini untuk tabel Delta. Selain skema tabel dan data sampel, Anda dapat mengklik tab Riwayat untuk melihat riwayat tabel yang ditampilkan dengan DESCRIBE HISTORY
.
Skema riwayat
Output operasi history
memiliki kolom berikut.
Column | Tipe | Deskripsi |
---|---|---|
version | long | Versi tabel yang dihasilkan oleh operasi. |
rentang waktu | rentang waktu | Saat versi ini diterapkan. |
userId | string | ID pengguna yang menjalankan operasi. |
userName | string | Nama pengguna yang menjalankan operasi. |
operasi | string | Nama operasi. |
operationParameters | peta | Parameter operasi (misalnya, predikat.) |
tugas | struktur | Rincian pekerjaan yang menjalankan operasi. |
buku catatan | struktur | Detail buku catatan tempat operasi dijalankan. |
clusterId | string | ID kluster tempat operasi dijalankan. |
readVersion | long | Versi tabel yang dibaca untuk melakukan operasi tulis. |
isolationLevel | string | Tingkat isolasi yang digunakan untuk operasi ini. |
isBlindAppend | Boolean | Apakah operasi ini menambahkan data. |
operationMetrics | peta | Metrik operasi (misalnya, jumlah baris dan file yang dimodifikasi.) |
userMetadata | string | Metadata penerapan yang ditentukan pengguna jika ditentukan |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Catatan
- Beberapa kolom lain tidak tersedia jika Anda menulis ke dalam tabel Delta menggunakan metode berikut:
- Kolom yang ditambahkan di masa mendatang akan selalu ditambahkan setelah kolom terakhir.
Kunci metrik operasi
Operasi history
mengembalikan kumpulan metrik operasi di peta kolom operationMetrics
.
Tabel berikut mencantumkan definisi kunci peta berdasarkan operasi.
Operasi | Nama metrik | Deskripsi |
---|---|---|
TULIS, CREATE TABLE AS SELECT, GANTI TABLE AS SELECT, COPY INTO | ||
numFiles | Jumlah file yang ditulis. | |
numOutputBytes | Ukuran dalam byte dari konten tertulis. | |
numOutputRows | Jumlah baris yang ditulis. | |
Streaming UPDATE | ||
numAddedFiles | Jumlah file yang ditambahkan. | |
numRemovedFiles | Jumlah file yang dihapus. | |
numOutputRows | Jumlah baris yang ditulis. | |
numOutputBytes | Ukuran tulis dalam byte. | |
DELETE | ||
numAddedFiles | Jumlah file yang ditambahkan. Tidak disediakan saat partisi tabel dihapus. | |
numRemovedFiles | Jumlah file yang dihapus. | |
numDeletedRows | Jumlah baris yang dihapus. Tidak disediakan saat partisi tabel dihapus. | |
numCopiedRows | Jumlah baris yang disalin dalam proses menghapus file. | |
executionTimeMs | Waktu yang dibutuhkan untuk menjalankan seluruh operasi. | |
scanTimeMs | Waktu yang dibutuhkan untuk memindai file untuk kecocokan. | |
rewriteTimeMs | Waktu yang dibutuhkan untuk menulis ulang file yang cocok. | |
TRUNCATE | ||
numRemovedFiles | Jumlah file yang dihapus. | |
executionTimeMs | Waktu yang dibutuhkan untuk menjalankan seluruh operasi. | |
GABUNG | ||
numSourceRows | Jumlah baris di DataFrame sumber. | |
numTargetRowsInserted | Jumlah baris yang dimasukkan ke dalam tabel target. | |
numTargetRowsUpdated | Jumlah baris yang diperbarui dalam tabel target. | |
numTargetRowsDeleted | Jumlah baris yang dihapus dalam tabel target. | |
numTargetRowsCopied | Jumlah baris target yang disalin. | |
numOutputRows | Jumlah total baris yang ditulis. | |
numTargetFilesAdded | Jumlah file yang ditambahkan ke sink(target). | |
numTargetFilesRemoved | Jumlah file yang dihapus dari sink(target). | |
executionTimeMs | Waktu yang dibutuhkan untuk menjalankan seluruh operasi. | |
scanTimeMs | Waktu yang dibutuhkan untuk memindai file untuk kecocokan. | |
rewriteTimeMs | Waktu yang dibutuhkan untuk menulis ulang file yang cocok. | |
UPDATE | ||
numAddedFiles | Jumlah file yang ditambahkan. | |
numRemovedFiles | Jumlah file yang dihapus. | |
numUpdatedRows | Jumlah baris yang diperbarui. | |
numCopiedRows | Jumlah baris yang baru saja disalin dalam proses memperbarui file. | |
executionTimeMs | Waktu yang dibutuhkan untuk menjalankan seluruh operasi. | |
scanTimeMs | Waktu yang dibutuhkan untuk memindai file untuk kecocokan. | |
rewriteTimeMs | Waktu yang dibutuhkan untuk menulis ulang file yang cocok. | |
FSCK | numRemovedFiles | Jumlah file yang dihapus. |
CONVERT | numConvertedFiles | Jumlah file Parket yang telah dikonversi. |
OPTIMIZE | ||
numAddedFiles | Jumlah file yang ditambahkan. | |
numRemovedFiles | Jumlah file yang dioptimalkan. | |
numAddedBytes | Jumlah byte yang ditambahkan setelah tabel dioptimalkan. | |
numRemovedBytes | Jumlah byte yang dihapus. | |
minFileSize | Ukuran file terkecil setelah tabel dioptimalkan. | |
p25FileSize | Ukuran file persentil ke-25 setelah tabel dioptimalkan. | |
p50FileSize | Ukuran file median setelah tabel dioptimalkan. | |
p75FileSize | Ukuran file persentil ke-75 setelah tabel dioptimalkan. | |
maxFileSize | Ukuran file terbesar setelah tabel dioptimalkan. | |
CLONE | ||
sourceTableSize | Ukuran dalam byte tabel sumber pada versi yang dikloning. | |
sourceNumOfFiles | Jumlah file dalam tabel sumber pada versi yang dikloning. | |
numRemovedFiles | Jumlah file yang dihapus dari tabel target jika tabel Delta sebelumnya diganti. | |
removedFilesSize | Ukuran total dalam byte file dihapus dari tabel target jika tabel Delta sebelumnya diganti. | |
numCopiedFiles | Jumlah file yang disalin ke lokasi baru. 0 untuk klon dangkal. | |
copiedFilesSize | Ukuran total dalam byte file yang disalin ke lokasi baru. 0 untuk klon dangkal. | |
RESTORE | ||
tableSizeAfterRestore | Ukuran tabel dalam byte setelah pemulihan. | |
numOfFilesAfterRestore | Jumlah file dalam tabel setelah pemulihan. | |
numRemovedFiles | Jumlah file yang dihapus oleh operasi pemulihan. | |
numRestoredFiles | Jumlah file yang ditambahkan sebagai hasil pemulihan. | |
removedFilesSize | Ukuran dalam byte file yang dihapus oleh pemulihan. | |
restoredFilesSize | Ukuran dalam byte file yang ditambahkan oleh pemulihan. | |
VACUUM | ||
numDeletedFiles | Jumlah file yang dihapus. | |
numVacuumedDirectories | Jumlah direktori yang dikosongkan. | |
numFilesToDelete | Jumlah file yang akan dihapus. |
Apa itu perjalanan waktu Delta Lake?
Perjalanan waktu Delta Lake mendukung kueri versi tabel sebelumnya berdasarkan tanda waktu atau versi tabel (seperti yang dicatat dalam log transaksi). Anda dapat menggunakan perjalanan waktu untuk aplikasi seperti berikut:
- Membuat ulang analisis, laporan, atau keluaran (misalnya, keluaran model pembelajaran mesin). Ini bisa berguna untuk debugging atau audit, terutama di industri yang diatur.
- Menulis pertanyaan temporal yang kompleks.
- Memperbaiki kesalahan dalam data Anda.
- Menyediakan isolasi snapshot untuk sekumpulan kueri untuk tabel yang berubah dengan cepat.
Penting
Versi tabel yang dapat diakses dengan perjalanan waktu ditentukan oleh kombinasi ambang retensi untuk file log transaksi dan frekuensi dan retensi yang ditentukan untuk VACUUM
operasi. Jika Anda menjalankan VACUUM
setiap hari dengan nilai default, data 7 hari tersedia untuk perjalanan waktu.
Sintaks perjalanan waktu delta
Anda mengkueri tabel Delta dengan perjalanan waktu dengan menambahkan klausa setelah spesifikasi nama tabel.
-
timestamp_expression
dapat berupa salah satu dari:-
'2018-10-18T22:15:12.013Z'
, yaitu, string yang dapat ditransmisikan ke stempel waktu cast('2018-10-18 13:36:32 CEST' as timestamp)
-
'2018-10-18'
, yaitu, string tanggal current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Ekspresi lain yang sedang atau dapat ditransmisikan ke stempel waktu
-
-
version
adalah nilai panjang yang dapat diperoleh dari outputDESCRIBE HISTORY table_spec
.
Baik timestamp_expression
maupun version
tidak boleh berupa subkueri.
Hanya string tanggal atau tanda waktu yang diterima. Misalnya, "2019-01-01"
dan "2019-01-01T00:00:00.000Z"
. Lihat kode berikut untuk contoh sintaks:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Anda juga dapat menggunakan @
sintaks untuk menentukan tanda waktu atau versi sebagai bagian dari nama tabel. Stempel waktu harus dalam format yyyyMMddHHmmssSSS
. Anda dapat menentukan versi setelah @
dengan menambahkan v
ke versi tersebut. Lihat kode berikut untuk contoh sintaks:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Apa itu titik pemeriksaan log transaksi?
Delta Lake merekam versi tabel sebagai file JSON dalam _delta_log
direktori, yang disimpan bersama data tabel. Untuk mengoptimalkan kueri titik pemeriksaan, Delta Lake mengagregasi versi tabel ke file titik pemeriksaan Parquet, mencegah kebutuhan untuk membaca semua riwayat tabel versi JSON. Azure Databricks mengoptimalkan frekuensi titik pemeriksaan untuk ukuran data dan beban kerja. Pengguna tidak perlu berinteraksi dengan titik pemeriksaan secara langsung. Frekuensi titik pemeriksaan dapat berubah tanpa pemberitahuan.
Mengonfigurasi retensi data untuk kueri perjalanan waktu
Untuk mengkueri versi tabel sebelumnya, Anda harus menyimpan log dan file data untuk versi tersebut.
File data dihapus saat VACUUM
dijalankan terhadap tabel. Delta Lake mengelola penghapusan file log secara otomatis setelah mengecek versi tabel.
Karena sebagian besar tabel Delta telah VACUUM
berjalan terhadapnya secara teratur, kueri point-in-time harus menghormati ambang retensi untuk VACUUM
, yaitu 7 hari secara default.
Untuk meningkatkan ambang retensi data untuk tabel Delta, Anda harus mengonfigurasi properti tabel berikut:
-
delta.logRetentionDuration = "interval <interval>"
: mengontrol lama riwayat untuk tabel disimpan. Default adalahinterval 30 days
. -
delta.deletedFileRetentionDuration = "interval <interval>"
: menentukan ambang batasVACUUM
yang digunakan untuk menghapus file data yang tidak lagi dirujuk dalam versi tabel saat ini. Default adalahinterval 7 days
.
Anda dapat menentukan properti Delta selama pembuatan tabel atau mengaturnya dengan ALTER TABLE
pernyataan. Lihat Referensi properti tabel Delta.
Catatan
Anda harus mengatur kedua properti ini untuk memastikan riwayat tabel dipertahankan untuk durasi yang lebih lama untuk tabel dengan operasi yang sering VACUUM
. Misalnya, untuk mengakses 30 hari data historis, atur delta.deletedFileRetentionDuration = "interval 30 days"
(yang cocok dengan pengaturan default untuk delta.logRetentionDuration
).
Meningkatkan ambang retensi data dapat menyebabkan biaya penyimpanan Anda naik, karena lebih banyak file data dipertahankan.
Memulihkan tabel Delta ke kondisi sebelumnya
Anda dapat memulihkan tabel Delta ke kondisi sebelumnya dengan menggunakan perintah RESTORE
. Tabel Delta secara internal mempertahankan versi historis tabel yang memungkinkannya dipulihkan ke kondisi sebelumnya.
Versi yang sesuai dengan kondisi sebelumnya atau stempel waktu saat kondisi sebelumnya dibuat didukung sebagai opsi oleh perintah RESTORE
.
Penting
- Anda dapat memulihkan tabel yang sudah dipulihkan.
- Anda dapat memulihkan tabel kloning.
- Anda harus memiliki izin
MODIFY
pada tabel yang sedang dipulihkan. - Anda tidak dapat memulihkan tabel ke versi yang lebih lama di mana file data dihapus secara manual atau oleh
vacuum
. Memulihkan ke versi ini sebagian masih memungkinkan jikaspark.sql.files.ignoreMissingFiles
diatur ketrue
. - Format stempel waktu untuk memulihkan ke kondisi sebelumnya adalah
yyyy-MM-dd HH:mm:ss
. Hanya menyediakan string tanggal(yyyy-MM-dd
) juga didukung.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Untuk detail sintaks, lihat RESTORE.
Penting
Pemulihan dianggap sebagai operasi pengubahan data. Entri log Delta Lake yang ditambahkan oleh perintah RESTORE
berisi dataChange yang diatur ke true. Jika ada aplikasi hilir, seperti pekerjaan Streaming terstruktur yang memproses pembaruan ke tabel Delta Lake, entri log perubahan data yang ditambahkan oleh operasi pemulihan dianggap sebagai pembaruan data baru, dan memprosesnya mungkin mengakibatkan data duplikat.
Contohnya:
Versi tabel | Operasi | Pembaruan log Delta | Rekaman dalam pembaruan log perubahan data |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (nama = George, usia = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Tidak ada rekaman sebagai Optimalkan pemadatan tidak mengubah data dalam tabel) |
3 | RESTORE(versi=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (nama = Viktor, usia = 29), (nama = George, usia = 55), (nama = George, usia = 39) |
Pada contoh sebelumnya, perintah RESTORE
menghasilkan pembaruan yang sudah terlihat saat membaca tabel Delta versi 0 dan 1. Jika kueri streaming membaca tabel ini, maka file ini akan dianggap sebagai data yang baru ditambahkan dan akan diproses lagi.
Pulihkan metrik
RESTORE
melaporkan metrik berikut sebagai DataFrame baris tunggal setelah operasi selesai:
table_size_after_restore
: Ukuran tabel setelah memulihkan.num_of_files_after_restore
: Jumlah file dalam tabel setelah memulihkan.num_removed_files
: Jumlah file yang dihapus (dihapus secara logis) dari tabel.num_restored_files
: Jumlah file yang dipulihkan karena bergulir kembali.removed_files_size
: Ukuran total dalam byte file yang dihapus dari tabel.restored_files_size
: Ukuran total dalam byte file yang dipulihkan.
Contoh penggunaan perjalanan waktu Delta Lake
Memperbaiki penghapusan yang tidak disengaja ke tabel untuk pengguna
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Memperbaiki pembaruan yang salah secara tidak sengaja ke tabel:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Tanyakan jumlah pelanggan baru yang ditambahkan selama seminggu terakhir.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Bagaimana cara menemukan versi penerapan terakhir dalam sesi Spark?
Untuk mendapatkan nomor versi dari penerapan terakhir yang ditulis oleh SparkSession
saat ini di semua utas dan semua tabel, kueri konfigurasi SQL spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Jika tidak ada komit yang dibuat oleh SparkSession
, mengkueri kunci mengembalikan nilai kosong.
Catatan
Jika Anda berbagi SparkSession
yang sama di beberapa utas, ini serupa dengan berbagi variabel di beberapa utas; Anda dapat mencapai kondisi balap karena nilai konfigurasi diperbarui secara bersamaan.