Itai Yaffe
07/17/2023, 10:23 AMWarning
Concurrent writes to the same Delta table on S3 storage from multiple Spark drivers can lead to data loss. For a multi-cluster solution, please see the Multi-cluster setup section below.
What about maintenance tasks (OPTIMIZE
, DELETE
, VACUUM
)? This doc sheds some light, but there are a few things I'd like to verify:
1. The doc mentions file compaction operation (which I'm assuming is what folks used to do before OPTIMIZE
was open-sourced?) cannot conflict with INSERT
. Is it safe to assume OPTIMIZE
and INSERT
cannot result in a conflict even if they operate on the same set of files (e.g the table is partitioned by DATE
and both operations work on the same day)?
2. VACUUM
is not mentioned at all in this doc - I'm assuming (due to the nature of the VACUUM
operation) it means it cannot conflict with any of the other operations, correct?
Thanks!Dominique Brezinski
07/17/2023, 2:04 PMItai Yaffe
07/17/2023, 4:11 PMDominique Brezinski
07/17/2023, 4:26 PMItai Yaffe
07/18/2023, 8:56 AMdelta-rs
IIUC.
Anyway, we'll have to take a closer look and evaluate the effort.
Thanks again for your help, @Dominique Brezinski!S Thelin
07/18/2023, 9:28 AMItai Yaffe
07/18/2023, 10:43 AMVACUUM
, @Dominique Brezinski said it's safe (as it doesn't change the log), so is there a reason you only run vacuum on specific times when nothing else is running?
Also, how are you running OPTIMIZE
and DELETE
(which are not safe)?S Thelin
07/18/2023, 10:56 AMItai Yaffe
07/18/2023, 2:14 PMOPTIMIZE
and INSERT
), can still lead to data loss if executed from separate clusters without multi-cluster support enabled;
Any other thoughts/insights on using DynamoDB in an "unconventional" manner for multi-cluster support (on a private cloud with no DynamoDB available), either by:
1. Storing the data+metadata on an S3-compatible Object Storage, but using a "remote" DynamoDB on AWS; OR
2. By using a DynamoDB-compatible replacement like ScyllaDB (https://www.scylladb.com/alternator/, per @Christian Pfarr comment on this issue)?
Thanks!Dominique Brezinski
07/19/2023, 1:40 PMItai Yaffe
07/19/2023, 1:54 PMDominique Brezinski
07/19/2023, 2:52 PMS Thelin
07/19/2023, 3:23 PMItai Yaffe
07/20/2023, 1:43 PM_delta_log
folder) and the Checkpoints (stored in Parquet) are immutable. Each modification of the table is recorded to the transaction log via a new Delta JSON file.
2. The only time that a file in the transaction log is overwritten, is when a writer produces a new checkpoint file, since it also needs to overwrite the _delta_log/_last_checkpoint
to point to that new checkpoint file.
3. If #1 and #2 are correct, then a data loss (in the case of S3-like Object Store without multi-cluster support enabled) can occur if writer A wrote a new Delta log file, and writer B immediately wrote a new Delta log file with the same name, essentially overwriting the previous file, due to the non-atomic nature of the S3-like Object Store (e.g per this paragraph).
Would that be a reasonable description?
Thanks a lot!Dominique Brezinski
07/20/2023, 2:12 PMItai Yaffe
07/24/2023, 11:05 AMOops! It did not create a delta table as expected
is interesting and not specific only to Rust)
2. https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/S Thelin
07/24/2023, 3:17 PMspark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore
Scott Haines
07/24/2023, 5:27 PMdelta.isolationLevel
• supporting Serializable, WriteSerializable, or SnapshotIsolation
Serializable
is the most aggressive - table history is always accurate. This is probably what you want in your case.
• to the point made by @S Thelin - you can use the thrown Exceptions to retry. This behavior will differ for batch jobs vs streaming jobs (batch is probably easier - let the job fail, and retry via orchestration. On the next attempt, the modified DeltaSnapshot will be reread (with the new changes), if the collision is captured with a ConcurrentModificationException: via the specific collisions for the given operation (*DeleteReadException, etc)
In the case of OPTIMIZE, while it can collide, the pattern that can be useful is optimizing based on "sealed" partitions. For example, if the job writes to daily partitions, then the prefix will be fairly noisy (read/write heavy) on the day of, but afterwards, you have less chance of collision. (this is the simplest solution).
Alternatives:
you can use the delta lake tblproperties to coordinate changes, or to react to complex coordination.
MetadataChangedException
- https://docs.delta.io/latest/concurrency-control.html#metadatachangedexception - can be used to lock and unlock a table. If a external process is being used to run periodic OPTIMIZE, or other transactions on a shared table, then create something like platform.ops.transactions.lock.enabled=true, platform.ops.transactions.lock.owner=X, platform.ops.transactions.lock.operation=OPTIMIZE
that can be added or removed using ALTER TABLE {x} SET TBLPROPERTIES (key=value,...)
- the MetadataChangedException will halt streaming pipelines (so if DeltaTable.forName(spark, "catalog.schema.table").properties()
contains any information that can be used to coordinate, then it is more work but can be used to essentially do the locking and unlocking.Itai Yaffe
07/25/2023, 8:00 AMtblproperties
is interesting.
2 follow-up questions/comments:
1. Just to clarify, in the lack of multi-cluster support scenario, regardless of which table isolation mode I'm using, no conflict exceptions will be thrown - we'll just have a data loss. This statement still holds, correct?
2. As per using Delta's tblproperties
- I believe I will have to programmatically check for DeltaTable.forName(spark, "catalog.schema.table").properties()
before initiating any write, and if it contains the "custom-lock properties", I need to throw the exception myself, correct?
3. Maybe I'm missing something, but w.r.t the test setup you mentioned (having a streaming reader and running an ALTER TABLE
from another process - per the docs, MetadataChangedException
occurs when a concurrent transaction updates the metadata of a Delta table. Why would the streaming reader get this exception when another process alters the table?
Thanks!Scott Haines
07/26/2023, 4:25 PMdelta.*
properties, or it can be a change affecting the schema
or a table overwrite like in the case of table restoration patterns with live consumers.Itai Yaffe
07/27/2023, 2:12 PMtblproperties
, or something else), before potentially getting back to this thread with follow-ups.
Thanks 🙂Dominique Brezinski
07/27/2023, 2:17 PMItai Yaffe
07/27/2023, 2:22 PMLogStore
that relies on something other than DynamoDB).