Aaron Kub
02/07/2023, 2:27 AMdelta.deletedFileRetentionDuration
table property to "interval 21 days". When we run the vacuum command with a 504 hour retention period (21 days) we end up with corrupted snapshots that were created within the 21 day period. By "corrupted" I mean that when a version that is, say, 10 days old is read there is a FileNotFoundException
for certain part files that have now been deleted unexpectedly.
After digging deeper by running vacuum commands in dry-run mode I've concluded that the deleted files were (1) "removed" in a snapshot version greater than 7 days ago and (2) created over 21 days ago.
The ultimate culprit seems to be that the snapshot that Delta uses to generate the validFiles object in the VaccumCommand.gc
method is filtering out files that were removed more than 7 days ago because it's defaulting to the default retention period.
I've confirmed that the snapshot only has 7 days of removed files by doing something like this:
val deltaLog = DeltaLog.forTable(spark, "<s3://path-to-table|s3://path-to-table>")
val snapshot = deltaLog.update()
val removedFiles = snapshot.state
.filter(_.remove != null)
.collect()
.map(_.unwrap.asInstanceOf[RemoveFile])
.sortBy(_.delTimestamp)
Lastly, to double check that the default TOMBSTONE_RETENTION
value is in fact what is causing this behavior, I cloned the delta project and changed the default value to 21 days, then ran the above steps again. I then had 21 days of removed file history in the snapshot.
Sanity checks: I am completely sure that the retention period on the table is indeed set to 21 days by using the SHOW TBLPROPERTIES
command. The checkpoint files in the _delta_log
directory also contain 21 days of removed file history. It's just the in-memory snapshot that filters the removed files down to the most recent 7 days.
We are using AWS EMR, Scala 2.12, and Delta 1.0.0, although I have reproduced this behavior on delta 2.1.0.
It feels like I must be missing something. Has anyone encountered this?Scott Sandre (Delta Lake)
02/07/2023, 5:04 PMit's defaulting to the default retention period.Do you see this happening anywhere in the code? To get the tombstone retention period, we always use extract it from the latest table metadata
def tombstoneRetentionMillis(metadata: Metadata): Long = {
DeltaConfigs.getMilliSeconds(DeltaConfigs.TOMBSTONE_RETENTION.fromMetaData(metadata))
}
Aaron Kub
02/07/2023, 5:10 PMMetadata()
object, DeltaConfigs.TOMBSTONE_RETENTION.fromMetaData(metadata)
returns the default of 7 days. And I suspect the initial snapshot that is created uses the default metadata because of that if (snapshot == null)
check. There seems to be a circular relationship thereSnapshotManagement
uses minFileRetentionTimestamp to filter out "unnecessary" removed files from the snapshot, which will be that default of 7 days, rather than the configured retention for the first snapshotScott Sandre (Delta Lake)
02/07/2023, 5:18 PMrather than the configured retention in for the first snapshothow could this be? a "snapshot" is the latest state of the table. if you set the metadata in the first snapshot, then that metadata will still be in the latest snapshot
Aaron Kub
02/07/2023, 5:26 PMSnapshotManagement
class depends on DeltaLog.metadata
(via the minFileRetentionTimestamp
) in the SnapshotManagement.getSnapshotAtInit methodScott Sandre (Delta Lake)
02/07/2023, 5:33 PMSnapshotManagement:getSnapshotAtInit -->
DeltaLog:minFileRetentionTimestamp -->
DeltaLog:tombstoneRetentionMillis -->
DeltaConfigs.getMilliSeconds(DeltaConfigs.TOMBSTONE_RETENTION.fromMetaData(metadata)) -->
def metadata = if (snapshot == null) Metadata() else snapshot.metadata -->
where snapshot is SnapshotManagement::protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit(lastCheckpoint)
Aaron Kub
02/07/2023, 5:33 PMScott Sandre (Delta Lake)
02/07/2023, 5:38 PMAaron Kub
02/07/2023, 5:38 PM