https://delta.io logo
i

Itai Yaffe

07/17/2023, 10:23 AM
Hey, S3/S3-like object stores concurrent writes and maintenance tasks For S3 (and S3-like object stores), Delta Lake's documentation states that concurrent writes to the same table could result in data loss:
Copy code
Warning

Concurrent writes to the same Delta table on S3 storage from multiple Spark drivers can lead to data loss. For a multi-cluster solution, please see the Multi-cluster setup section below.
What about maintenance tasks (
OPTIMIZE
,
DELETE
,
VACUUM
)? This doc sheds some light, but there are a few things I'd like to verify: 1. The doc mentions file compaction operation (which I'm assuming is what folks used to do before
OPTIMIZE
was open-sourced?) cannot conflict with
INSERT
. Is it safe to assume
OPTIMIZE
and
INSERT
cannot result in a conflict even if they operate on the same set of files (e.g the table is partitioned by
DATE
and both operations work on the same day)? 2.
VACUUM
is not mentioned at all in this doc - I'm assuming (due to the nature of the
VACUUM
operation) it means it cannot conflict with any of the other operations, correct? Thanks!
d

Dominique Brezinski

07/17/2023, 2:04 PM
First, the doc points to how to setup Delta to be concurrent writer safe on multiple clusters. VACUUM is safe due to how it operates—in is not making changes to the log or active data files. Compaction is what OPTIMIZE does (either with ordering or without). Compaction does not conflict with appends/inserts, even on the same partition. That is because appends/inserts always write new files, and compaction happens only on existing files. However, those operations still need to be done from the same driver or with the multi-cluster write feature configured to be safe, because they both update the delta log.
i

Itai Yaffe

07/17/2023, 4:11 PM
@Dominique Brezinski - thanks for the fast, detailed response! Per compaction vs OPTIMIZE - I totally agree with your comment, I just think the doc might be better phrased, as in since we have the OPTIMIZE statement, I would assume folks will be using that (rather than this method), right? Per compaction conflicts with inserts - your last sentence is a bit concerning to me, TBH. You're saying I can't have OPTIMIZE and inserts in parallel to the same table from different drivers, without multi-cluster enabled, correct? Let's take the following scenario: 1. A Structured Streaming job that inserts new records to a Delta table. 2. A periodic OPTIMIZE job that compacts the same table. #1 and #2 are usually 2 separate jobs, on separate clusters with separate drivers (but perhaps it's not mandatory?). 3. The storage layer is an "on-prem" object storage that's S3-compatible. Similar to S3, it doesn't provide atomic renames, and (since it's not AWS) there's no DynamoDB that's required by multi-cluster setup. What options do you think are available in this case? I guess one option is to somehow periodically run OPTIMIZE from the same driver that's running the Structured Streaming job (I guess that's doable, although it doesn't seem ideal). Anything else you can think of? Thanks again!
d

Dominique Brezinski

07/17/2023, 4:26 PM
Yes, I believe that is all correct. You can inline OPTIMIZE in the streaming job running it every modulo N batches, but that will introduce latency in those micro-batches. The right answer is to use the API interfaces and implement your own table lock against an internal system that supports the right semantics. I think someone recently posted they are working on a project to use Redis for the lock. That would be a great contribution for people running on internal S3 API compliant object stores without atomic rename.
i

Itai Yaffe

07/18/2023, 8:56 AM
Thanks! I think I found the thread you were referring to re/ Redis (this thread by @S Thelin, @rtyler , and @John Darrington ). There's another thread here (by @Mike Eastham), but it seems more specific to
delta-rs
IIUC. Anyway, we'll have to take a closer look and evaluate the effort. Thanks again for your help, @Dominique Brezinski!
s

S Thelin

07/18/2023, 9:28 AM
@Itai Yaffe not sure if this helps, but if you're running native spark, and the collisions are rare, you can just capture the collision exception and retry. The Redis lock I wrote is not for native spark but just for the delta-rs python binding. There is a better implementation with dynamodb which works with the delta-rs project without modifying or wrapping any code.
That's correct yes. I currently run spark in k8s and I run vacuum on very specific times when I know nothing else is interfering with it.
i

Itai Yaffe

07/18/2023, 10:43 AM
OK, so we still need some kind of multi-cluster solution. BTW, specifically w.r.t
VACUUM
, @Dominique Brezinski said it's safe (as it doesn't change the log), so is there a reason you only run vacuum on specific times when nothing else is running? Also, how are you running
OPTIMIZE
and
DELETE
(which are not safe)?
s

S Thelin

07/18/2023, 10:56 AM
I use soft delete logic on my bronze tables,then on silver I delete. I never delete on bronze layer, and then run deletes downstream. This way I handle this a bit better. But yes you probably need to mimic what Databricks does. I'm going to look into how I can do this on spark on k8s
i

Itai Yaffe

07/18/2023, 2:14 PM
Gotcha. Please let me know what you find 😉
Since we established, per @Dominique Brezinski’s comment, that even operations that cannot conflict (e.g
OPTIMIZE
and
INSERT
), can still lead to data loss if executed from separate clusters without multi-cluster support enabled; Any other thoughts/insights on using DynamoDB in an "unconventional" manner for multi-cluster support (on a private cloud with no DynamoDB available), either by: 1. Storing the data+metadata on an S3-compatible Object Storage, but using a "remote" DynamoDB on AWS; OR 2. By using a DynamoDB-compatible replacement like ScyllaDB (https://www.scylladb.com/alternator/, per @Christian Pfarr comment on this issue)? Thanks!
d

Dominique Brezinski

07/19/2023, 1:40 PM
To be clear we are just talking about an alternate lock provider than DynamoDB for people using on-prem S3 compatible object stores. The transaction log and data are always stored in the object store. But if the object store does not support atomic rename, then a lock is necessary to safely commit a transaction to the log when non-same-driver writers are running concurrently.
👍 2
i

Itai Yaffe

07/19/2023, 1:54 PM
@Dominique Brezinski - yep, agreed! That's why I was wondering, as a follow-up question, if anyone has any insights/thoughts/experience with using a "remote" DynamoDB on AWS (when the object store resides in a private cloud), or perhaps using a DynamoDB-compatible replacement like ScyllaDB
d

Dominique Brezinski

07/19/2023, 2:52 PM
Using the supported DynamoDB solution will work with any S3 compatible object store that you can provision access to. The only reason to do anything else is if you really don’t want to pay the nominal DDB costs or can’t access AWS for any reason.
s

S Thelin

07/19/2023, 3:23 PM
Is there any docs anywhere to use that with multi spark cluster on k8s?
I currently use the open source spark operator to submit jobs to k8s.
i

Itai Yaffe

07/20/2023, 1:43 PM
@Dominique Brezinski - that's helpful, even as an interim solution, thanks! If I may (and sorry for digging into this) - @Gilad Asulin and I chatted about that, and I've gone over most of the Delta Transaction Log Protocol, but we wanted to better understand what is means to "update the delta log" and how can data loss occur: 1. IIUC, individual files that are part of the delta log are never changed. Both Delta Log Entries (the JSON Delta files in the
_delta_log
folder) and the Checkpoints (stored in Parquet) are immutable. Each modification of the table is recorded to the transaction log via a new Delta JSON file. 2. The only time that a file in the transaction log is overwritten, is when a writer produces a new checkpoint file, since it also needs to overwrite the
_delta_log/_last_checkpoint
to point to that new checkpoint file. 3. If #1 and #2 are correct, then a data loss (in the case of S3-like Object Store without multi-cluster support enabled) can occur if writer A wrote a new Delta log file, and writer B immediately wrote a new Delta log file with the same name, essentially overwriting the previous file, due to the non-atomic nature of the S3-like Object Store (e.g per this paragraph). Would that be a reasonable description? Thanks a lot!
d

Dominique Brezinski

07/20/2023, 2:12 PM
Correct! In that case the first writer’s commit would be lost. When the storage system supports and atomic test/set the overwrite is protected against. Otherwise we need to lock to protect the operation.
i

Itai Yaffe

07/24/2023, 11:05 AM
Thanks, @Dominique Brezinski! I'm assuming it's not the last time someone will ask such a question, so for future folks who stumble upon this thread, here are a couple of additional relevant links: 1. https://www.cuusoo.com.au/guide/navigating-the-data-lake-using-rust-part-two (the explanation there that starts with the words
Oops! It did not create a delta table as expected
is interesting and not specific only to Rust) 2. https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/
👍 1
@Scott Haines
s

S Thelin

07/24/2023, 3:17 PM
I assume this should work as well for spark on k8s
spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore
s

Scott Haines

07/24/2023, 5:27 PM
Hey @Itai Yaffe. I just caught up on the thread. For the multi-writer support (without table locking), each writer will have the following "views" of a given table: 1. local DeltaSnapshot (cached to create the base of any new transactions) 2. last transaction (version) - based on the local Snapshot Now you have table isolation modes.
Copy code
delta.isolationLevel
• supporting Serializable, WriteSerializable, or SnapshotIsolation
Serializable
is the most aggressive - table history is always accurate. This is probably what you want in your case. • to the point made by @S Thelin - you can use the thrown Exceptions to retry. This behavior will differ for batch jobs vs streaming jobs (batch is probably easier - let the job fail, and retry via orchestration. On the next attempt, the modified DeltaSnapshot will be reread (with the new changes), if the collision is captured with a ConcurrentModificationException: via the specific collisions for the given operation (*DeleteReadException, etc) In the case of OPTIMIZE, while it can collide, the pattern that can be useful is optimizing based on "sealed" partitions. For example, if the job writes to daily partitions, then the prefix will be fairly noisy (read/write heavy) on the day of, but afterwards, you have less chance of collision. (this is the simplest solution). Alternatives: you can use the delta lake tblproperties to coordinate changes, or to react to complex coordination.
MetadataChangedException
- https://docs.delta.io/latest/concurrency-control.html#metadatachangedexception - can be used to lock and unlock a table. If a external process is being used to run periodic OPTIMIZE, or other transactions on a shared table, then create something like
platform.ops.transactions.lock.enabled=true, platform.ops.transactions.lock.owner=X, platform.ops.transactions.lock.operation=OPTIMIZE
that can be added or removed using
ALTER TABLE {x} SET TBLPROPERTIES (key=value,...)
- the MetadataChangedException will halt streaming pipelines (so if
DeltaTable.forName(spark, "catalog.schema.table").properties()
contains any information that can be used to coordinate, then it is more work but can be used to essentially do the locking and unlocking.
The idea of using a remote dynamodb with sts also feels like a good pattern if it helps get you all one step forwards since the alternatives can be more complicated.
as for coordination via the DeltaLake tblproperties - you can test how this works by creating a new Delta Lake table, then creating 1 streaming reader of the table (let the process await new transactions periodically - every 30 seconds), then modify the table using ALTER TABLE and watch how the streaming application handles the modifications. • you can read the exceptions on the spark side using the StreamingQueryListener - or just checking streamingQuery.lastProgress - after any exception is thrown - before the application quits....
i

Itai Yaffe

07/25/2023, 8:00 AM
@Scott Haines - thanks for all the info! Coordinating changes via Delta's
tblproperties
is interesting. 2 follow-up questions/comments: 1. Just to clarify, in the lack of multi-cluster support scenario, regardless of which table isolation mode I'm using, no conflict exceptions will be thrown - we'll just have a data loss. This statement still holds, correct? 2. As per using Delta's
tblproperties
- I believe I will have to programmatically check for
DeltaTable.forName(spark, "catalog.schema.table").properties()
before initiating any write, and if it contains the "custom-lock properties", I need to throw the exception myself, correct? 3. Maybe I'm missing something, but w.r.t the test setup you mentioned (having a streaming reader and running an
ALTER TABLE
from another process - per the docs,
MetadataChangedException
occurs when a concurrent transaction updates the metadata of a Delta table. Why would the streaming reader get this exception when another process alters the table? Thanks!
s

Scott Haines

07/26/2023, 4:25 PM
1: there are shades of gray here. The big one being if there are simultaneous commits affecting the same version of the delta table. So it depends. If you have the right combination of changes being made on the table that could happen. I think you would see this if you are doing operations like conditional upserts where things fall into the MERGE style modifications. If you are doing ZORDER BY or bin-packing OPTIMIZE on the entire table, or entire partition of a given table while writing into the same table then you'd have to also hit the sweet spot (so it is possible, and the sts to common dynamodb locking will probably be the only way to ensure write-saftey) 2: Yes. This technique is leaning on the fact that the physical table is the point of table synchronicity. If we use the invariants of Delta (and we don't have collisions per 1) then you can use the tblproperties like a distributed lock if you are checking and short-circuiting - there are tricks to soft-restart an application without tearing down your entire application if you want to talk about that. 3: For the streaming process, it is scanning the delta table, and the exception is thrown to coordinate. Given the metadata can change to be a modification to the reader or writer versions (in the case of upgrading protocol), it can be harmless like in non
delta.*
properties, or it can be a change affecting the
schema
or a table overwrite like in the case of table restoration patterns with live consumers.
i

Itai Yaffe

07/27/2023, 2:12 PM
Thanks for the additional info, @Scott Haines! I'll further discuss this with @Gilad Asulin and we'll see which route we want to try (e.g the "remote" DynamoDB, or using the
tblproperties
, or something else), before potentially getting back to this thread with follow-ups. Thanks 🙂
👍 1
d

Dominique Brezinski

07/27/2023, 2:17 PM
You really want to do the remote lock. It is the only hard guarantee there is no corruption on concurrent writes.
delta io 1
💯 1
i

Itai Yaffe

07/27/2023, 2:22 PM
Thanks, @Dominique Brezinski! We're discussing it internally (and I agree with the preference of using the remote lock, or potentially implementing a version of
LogStore
that relies on something other than DynamoDB).
👍 2
2 Views