Delta Lake table geçmişi üzerinde çalışma
Delta Lake table değiştiren her işlem yeni bir table sürümü oluşturur. Geçmiş bilgilerini kullanarak işlemleri denetleyebilir, tablegeri alabilir veya zaman yolculuğu kullanarak belirli bir noktada table sorgulayabilirsiniz.
Not
Databricks, delta Lake table geçmişinin veri arşivleme için uzun vadeli bir yedekleme çözümü olarak kullanılmasını önermez. Databricks, hem veri hem de günlük saklama yapılandırmalarını daha büyük bir değere set sürece zaman yolculuğu işlemleri için yalnızca son 7 günün kullanılmasını önerir.
Delta table geçmişini alma
Delta table üzerindeki her yazma işlemi için işlemler, kullanıcı ve zaman damgası gibi bilgileri history
komutunu çalıştırarak alabilirsiniz. İşlemler ters kronolojik sırayla döndürülür.
Table geçmişi saklama, delta.logRetentionDuration
table ayarı ile belirlenir ve bu ayar varsayılan olarak 30 gündür.
Not
Zaman yolculuğu ve table geçmişi farklı saklama eşikleri tarafından denetlenmektedir. Bkz . Delta Lake zaman atlatma nedir?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL söz dizimi ayrıntıları için bkz. DESCRIBE HISTORY.
Scala/Java/Python söz dizimi ayrıntıları için Delta Lake API belgelerine bakın.
Catalog Explorer, Delta tablesiçin bu ayrıntılı table bilgilerinin ve geçmişinin görsel bir görünümünü sağlar.
table
schema ve örnek verilere ek olarak, DESCRIBE HISTORY
ile görüntülenen table geçmişini görmek için Geçmişi sekmesine tıklayabilirsiniz.
Geçmiş schema
history
işleminin çıktısı aşağıdaki columns'e sahiptir.
Column | Türü | Açıklama |
---|---|---|
sürüm | uzun | Table işlem tarafından oluşturulan sürüm. |
timestamp | timestamp | Bu sürüm işlendiğinde. |
userId | Dize | İşlemi çalıştıran kullanıcının kimliği. |
userName | Dize | İşlemi çalıştıran kullanıcının adı. |
operation | Dize | İşlemin adı. |
operationParameters | map | İşlemin Parameters'ı (örneğin, yüklemler). |
iş | struct | İşlemi çalıştıran işin ayrıntıları. |
not defteri | struct | İşlemin çalıştırıldığı not defterinin ayrıntıları. |
clusterId | Dize | İşlemin üzerinde çalıştırıldığı kümenin kimliği. |
readVersion | uzun | Yazma işlemini gerçekleştirmek için okunan table sürümü. |
isolationLevel | Dize | Bu işlem için kullanılan yalıtım düzeyi. |
isBlindAppend | boolean | Bu işlemin verileri ekleyip eklemediği. |
operationMetrics | map | İşlemin ölçümleri (örneğin, değiştirilen satır ve dosya sayısı.) |
userMetadata | Dize | Belirtilmişse kullanıcı tanımlı işleme meta verileri |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Not
- Delta table'e aşağıdaki yöntemlerle yazarsanız, diğer columns'lardan bazıları kullanılamaz:
- Gelecekte eklenecek olan Columns daima son column'den sonra eklenecektir.
İşlem ölçümleri anahtarları
history
işlemi, operationMetrics
column eşlemesinde bir işlem ölçümleri koleksiyonu döndürür.
Aşağıdaki tableslist harita anahtarı tanımları işlemlere göre verilmiştir.
İşlem | Ölçüm adı | Açıklama |
---|---|---|
YAZ, CREATE TABLE'ı SELECTOLARAK, DEĞİŞTİR TABLE'yi SELECTOLARAK, COPY INTO | ||
numFiles | Yazılan dosya sayısı. | |
numOutputBytes | Yazılan içeriğin bayt cinsinden boyutu. | |
numOutputRows | Yazılan satır sayısı. | |
AKıŞ UPDATE | ||
numAddedFiles | Eklenen dosya sayısı. | |
numRemovedFiles | Kaldırılan dosya sayısı. | |
numOutputRows | Yazılan satır sayısı. | |
numOutputBytes | Bayt cinsinden yazma boyutu. | |
SİL | ||
numAddedFiles | Eklenen dosya sayısı. table bölümleri silindiğinde sağlanmamaktadır. | |
numRemovedFiles | Kaldırılan dosya sayısı. | |
numDeletedRows | Kaldırılan satır sayısı. table bölümleri silindiğinde sağlanmaz. | |
numCopiedRows | Dosyaları silme işleminde kopyalanan satır sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
scanTimeMs | Dosyaları eşleşmeler için taramak için geçen süre. | |
rewriteTimeMs | Eşleşen dosyaları yeniden yazmak için geçen süre. | |
TRUNCATE | ||
numRemovedFiles | Kaldırılan dosya sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
BİRLEŞMEK | ||
numSourceRows | Kaynak DataFrame'deki satır sayısı. | |
numTargetRowsInserted | Hedef table'e eklenen satır sayısı. | |
numTargetRowsUpdated | hedefte tablegüncellenen satır sayısı. | |
numTargetRowsDeleted | Hedef table'de silinen satırların sayısı. | |
numTargetRowsCopied | Kopyalanan hedef satır sayısı. | |
numOutputRows | Yazılan toplam satır sayısı. | |
numTargetFilesAdded | Havuza (hedef) eklenen dosya sayısı. | |
numTargetFilesRemoved | Havuzdan (hedef) kaldırılan dosya sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
scanTimeMs | Dosyaları eşleşmeler için taramak için geçen süre. | |
rewriteTimeMs | Eşleşen dosyaları yeniden yazmak için geçen süre. | |
UPDATE | ||
numAddedFiles | Eklenen dosya sayısı. | |
numRemovedFiles | Kaldırılan dosya sayısı. | |
numUpdatedRows | Güncelleştirilen satır sayısı. | |
numCopiedRows | Dosyaları güncelleştirme işleminde kopyalanan satır sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
scanTimeMs | Dosyaları eşleşmeler için taramak için geçen süre. | |
rewriteTimeMs | Eşleşen dosyaları yeniden yazmak için geçen süre. | |
FSCK | numRemovedFiles | Kaldırılan dosya sayısı. |
DÖNÜŞTÜRMEK | numConvertedFiles | Dönüştürülen Parquet dosyalarının sayısı. |
OPTIMIZE | ||
numAddedFiles | Eklenen dosya sayısı. | |
numRemovedFiles | İyileştirilmiş dosya sayısı. | |
numAddedBytes | table iyileştirildikten sonra eklenen bayt sayısı. | |
numRemovedBytes | Kaldırılan bayt sayısı. | |
minFileSize | table iyileştirildikten sonra en küçük dosyanın boyutu. | |
p25FileSize | table optimize edildikten sonra 25. yüzdebirlik dosyanın boyutu. | |
p50FileSize | table iyileştirildikten sonra ortanca dosya boyutu. | |
p75FileSize | table optimize edildikten sonra 75. persentil dosyanın boyutu. | |
maxFileSize | table iyileştirildikten sonra en büyük dosyanın boyutu. | |
CLONE | ||
sourceTableSize | Klonlanmış olan versiyondaki kaynak table'ın boyutu bayt olarak. | |
sourceNumOfFiles | Kopyalanan sürümde kaynak table'daki dosya sayısı. | |
numRemovedFiles | Önceki bir Delta table değiştirildiyse, hedef table'den kaldırılan dosya sayısı. | |
removedFilesSize | Eğer önceki bir Delta table değiştirildiyse, hedef table'dan kaldırılan dosyaların bayt cinsinden toplam boyutu. | |
numCopiedFiles | Yeni konuma kopyalanan dosyaların sayısı. Sığ klonlar için 0. | |
copiedFilesSize | Yeni konuma kopyalanan dosyaların bayt cinsinden toplam boyutu. Sığ klonlar için 0. | |
RESTORE | ||
tableSizeAfterRestore | restoresonrası Table boyutunun bayt cinsinden hesabı. | |
numOfFilesAfterRestore | restoresonra table dosya sayısı. | |
numRemovedFiles | restore işlemi tarafından kaldırılan dosya sayısı. | |
numRestoredFiles | restoresonucu eklenen dosya sayısı. | |
removedFilesSize | restoretarafından kaldırılan dosyaların bayt cinsinden boyutu. | |
restoredFilesSize | restoretarafından eklenen dosyaların bayt cinsinden boyutu. | |
VACUUM | ||
numDeletedFiles | Silinen dosya sayısı. | |
numVacuumedDirectories | Vakumlanmış dizin sayısı. | |
numFilesToDelete | Silinecek dosya sayısı. |
Delta Lake zaman yolculuğu nedir?
Delta Lake zaman yolculuğu, zaman damgasına veya table sürümüne göre (işlem günlüğüne kaydedildiği gibi) önceki table sürümlerini sorgulamayı destekler. Aşağıdaki gibi uygulamalar için zaman yolculuğu kullanabilirsiniz:
- Çözümlemeleri, raporları veya çıkışları yeniden oluşturma (örneğin, bir makine öğrenmesi modelinin çıktısı). Bu, özellikle düzenlemeye tabi sektörlerde hata ayıklama veya denetim için yararlı olabilir.
- Karmaşık zamana bağlı sorgular yazma.
- Verilerinizdeki hataları düzeltme.
- hızlı değişen tablesiçin bir sorgu kümesi set'a yönelik anlık görüntü izolasyonu sağlama.
Önemli
Zaman yolculuğuyla erişilebilen Table sürümler, işlem günlüğü dosyaları için bekletme eşiği ile VACUUM
işlemleri için sıklık ve belirtilen bekletmenin bir bileşimiyle belirlenir. varsayılan valuesile günlük VACUUM
çalıştırırsanız, zaman yolculuğu için 7 günlük veri kullanılabilir.
Delta zaman atlatma söz dizimi
table ad belirtiminin arkasına bir yan tümce ekleyerek zaman yolculuğu ile Delta table'ı sorgulayabilirsiniz.
-
timestamp_expression
şu türlerden herhangi biri olabilir:-
'2018-10-18T22:15:12.013Z'
, başka bir ifadeyle zaman damgasına atanabilen bir dizedir cast('2018-10-18 13:36:32 CEST' as timestamp)
-
'2018-10-18'
, yani bir tarih dizesi current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Zaman damgasına atanabilen veya atanabilen diğer tüm ifadeler
-
-
version
, çıkışından elde edilebilen uzun bir değerdirDESCRIBE HISTORY table_spec
.
Alt timestamp_expression
sorgular da version
olamaz.
Yalnızca tarih veya zaman damgası dizeleri kabul edilir. Örneğin, "2019-01-01"
ve "2019-01-01T00:00:00.000Z"
. Söz dizimi gibi aşağıdaki koda bakın:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Zaman damgasını veya sürümü table adının bir parçası olarak belirtmek için @
söz dizimini de kullanabilirsiniz. Zaman damgası biçiminde yyyyMMddHHmmssSSS
olmalıdır. sürümünden sonra @
bir ön koşula bağlanarak bir v
sürüm belirtebilirsiniz. Söz dizimi gibi aşağıdaki koda bakın:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
İşlem günlüğü denetim noktaları nedir?
Delta Lake, table sürümleri table veriyle birlikte depolanan _delta_log
dizininde JSON dosyaları olarak kaydeder.
optimize denetim noktası sorgulaması için, Delta Lake, table sürümlerini Parquet denetim noktası dosyalarına toplayarak table geçmişinin tüm JSON sürümlerini okuma gereğini ortadan kaldırır. Azure Databricks, veri boyutu ve iş yükü için denetim noktası oluşturma sıklığını iyileştirir. Kullanıcıların denetim noktalarıyla doğrudan etkileşim kurması gerekmez. Denetim noktası sıklığı bildirimde bulunmadan değiştirilebilir.
Zaman yolculuğu sorguları için veri saklamayı yapılandırma
Önceki bir
VACUUM
bir tableüzerinde çalıştığında veri dosyaları silinir. Delta Lake, denetim noktası table sürümlerine geçtikten sonra günlük dosyasının kaldırılmasını otomatik olarak yönetir.
Çoğu Delta tables üzerinde düzenli olarak VACUUM
çalıştırıldığından, anlık sorgular varsayılan olarak 7 gün olan VACUUM
için bekletme eşiğine uygun olmalıdır.
Delta tablesiçin veri saklama eşiğini artırmak için aşağıdaki table özelliklerini yapılandırmanız gerekir:
-
delta.logRetentionDuration = "interval <interval>"
: table geçmişinin ne kadar süreyle tutulduğunu denetler. Varsayılan değer:interval 30 days
. -
delta.deletedFileRetentionDuration = "interval <interval>"
:VACUUM
, artık geçerli table sürümünde başvurulmayan veri dosyalarını remove için kullanırken eşiği belirler. Varsayılan değer:interval 7 days
.
Delta özelliklerini table oluşturma sırasında belirtebilir veya bir ALTER TABLE
ifadesi ile set edebilirsiniz. Bkz. Delta table özellik referansı.
Not
Sık VACUUM
işlemleri olan tables için table geçmişinin daha uzun süre korunmasını sağlamak için bu özelliklerin ikisini de set gerekir. Örneğin, 30 günlük geçmiş verilere erişmek için setdelta.deletedFileRetentionDuration = "interval 30 days"
(delta.logRetentionDuration
için varsayılan ayarla eşleşir).
Veri saklama eşiğinin artırılması, daha fazla veri dosyası tutuldukçe depolama maliyetlerinizin artmasına neden olabilir.
Restore Delta table'yi önceki bir duruma
restore komutunu kullanarak delta table'i önceki durumuna RESTORE
yapabilirsiniz. Delta table, table'in önceki bir duruma geri yüklenmesini sağlayan geçmiş sürümlerini dahili olarak saklar.
Önceki duruma veya önceki durumun oluşturulduğu zaman damgasına karşılık gelen bir sürüm, RESTORE
komutu tarafından seçenekler olarak desteklenir.
Önemli
- Zaten geri yüklenmiş bir table'i restore.
- kopyalanmıştableyapabilirsin restore.
- Geri yüklenen table üzerinde
MODIFY
izniniz olmalıdır. - Veri dosyalarının el ile veya
vacuum
tarafından silinmesini where eski bir sürüme tablerestore.spark.sql.files.ignoreMissingFiles
settrue
ise, bu sürüme kısmen geri yükleme yapmak hâlâ mümkündür. - Önceki bir duruma geri yüklemek için zaman damgası biçimi şeklindedir
yyyy-MM-dd HH:mm:ss
. Yalnızca bir tarih(yyyy-MM-dd
) dizesi sağlanması da desteklenir.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Söz dizimi ayrıntıları için bkz. RESTORE.
Önemli
Restore, veri değiştirme işlemi olarak kabul edilir. Delta Lake günlük girdileri, RESTORE
komutu tarafından eklenen veri değişikliklerini içermekte olup,set true olarak ayarlanmaktadır. tr-TR: Eğer Delta Lake tablegüncellemelerini işleyen, örneğin bir Yapılandırılmış akış işi gibi bir ardışık uygulama varsa, restore işlemi tarafından eklenen veri değişiklik günlüğü girişleri yeni veri güncellemeleri olarak değerlendirilir ve bunların işlenmesi, yinelenen verilere yol açabilir.
Örneğin:
Table sürümü | İşlem | Delta günlüğü güncelleştirmeleri | Veri değişikliği günlük güncelleştirmelerindeki kayıtlar |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (ad = Viktor, yaş = 29, (ad = George, yaş = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (tableiçindeki verileri değiştirmediği için Optimize sıkıştırması sonucu hiçbir kayıt yoktur) |
3 | RESTORE(sürüm=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (ad = Viktor, yaş = 29), (ad = George, yaş = 55), (ad = George, yaş = 39) |
Yukarıdaki örnekte, RESTORE
komutu Delta table sürüm 0 ve 1 okunurken zaten görülen güncelleştirmelere neden olur. Bir akış sorgusu bu tableokuyorsa, bu dosyalar yeni eklenen veriler olarak kabul edilir ve yeniden işlenir.
Restore ölçümleri
RESTORE
işlem tamamlandıktan sonra aşağıdaki ölçümleri tek satırlık DataFrame olarak bildirir:
table_size_after_restore
: Geri yükleme sonrası table boyutu.num_of_files_after_restore
: Geri yüklemeden sonra table dosya sayısı.num_removed_files
: tablekaldırılan (mantıksal olarak silinen) dosya sayısı.num_restored_files
: Geri dönme nedeniyle geri yüklenen dosyaların sayısı.removed_files_size
: tablekaldırılan dosyaların bayt cinsinden toplam boyutu.restored_files_size
: Geri yüklenen dosyaların bayt cinsinden toplam boyutu.
Delta Lake zaman atlatma kullanımına örnekler
Yanlışlıkla silinen table'ları kullanıcı
111
için düzeltin.INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Bir table'da yanlışlıkla yapılan hatalı güncellemeyi düzeltin.
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Geçen hafta eklenen yeni müşterilerin sayısını sorgula.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Spark oturumunda son işlemenin sürümünü Nasıl yaparım? buldunuz?
Geçerli SparkSession
tarafından tüm iş parçacıklarında ve tüm tablesyazılan son işlemenin sürüm numarasını get için SQL yapılandırmasını spark.databricks.delta.lastCommitVersionInSession
sorgular.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
tarafından SparkSession
hiçbir işleme yapılmadıysa, anahtarın sorgulanması boş bir değer döndürür.
Not
Aynı SparkSession
değeri birden çok iş parçacığında paylaşıyorsanız, bu bir değişkeni birden çok iş parçacığı arasında paylaşmaya benzer; yapılandırma değeri eşzamanlı olarak güncelleştirildikçe yarış koşullarına çarpabilirsiniz.