https://delta.io logo
o

Ovi

01/05/2023, 1:00 PM
Hello everyone, I have a situation where I need to delete and insert data from a delta table atomically, using spark. Is there a way that I could do this within a transaction? Please note that I can't use
replaceWhere
, because the replace condition is arbitrary and the delta version doesn't support it (and I can't upgrade it). Also, I can't use
merge
because there's no one-to-one mapping between the source data frame and the target delta table. Any advice is welcome. Thank you! PS: I need to wrap these commands (delete and insert) into a transaction, if possible
Copy code
DeltaTable.forPath(deltaTableLocation).delete(dataCondition)
                spark.read
                  .load(sourceDataLocation)
                  .filter(dataCondition)
                  .write
                  .format("delta")
                  .option("mergeSchema", "true")
                  .mode(Overwrite)
                  .save(deltaTableLocation)
j

Jon Stockham

01/05/2023, 2:04 PM
I think with the limitations you've described you could perhaps do it by doing the following: 1. read in your new source data, filtered by
dataCondition
, as
df_new
2. generate a unique key value for each row in
df_new
. Perhaps with
monotonically_increasing_id()
from
pyspark.sql.functions
, but you would need to start from the max value already in your target data. 3. read in your existing data, filtered by
dataCondition
, as
df_deletes
4. union
df_new
and
df_deletes
5. delta merge on the unioned df, matching on the generated key column. On match then delete, on no match then insert. I'm not nearly as experienced as most in here but at the least hopefully I can inspire you to come up with something more efficient.
n

Nick Karpov

01/05/2023, 2:07 PM
i think this is a good ask, it's popping up more and more (see this convo for example, just yesterday) - would you mind creating an issue on the delta github?
o

Ovi

01/05/2023, 2:38 PM
Thank you @Jon Stockham! Will try this out...
@Nick Karpov thank you for your suggestion, but I don't think that my use case is relevant enough to be put as an issue on Delta github, as it's easily solvable by a version upgrade. Lets suppose that a new feature for custom transactions would be developed as a result of my issue on github, then, to be able to use it, I would need also to do a version upgrade, thus by this proving itself not be a solution for my use case.
n

Nick Karpov

01/05/2023, 3:19 PM
well it's only easily solvable by an upgrade if such a feature were built! 😄 i will make the issue, no worries... also, one thing to watch out for in the solution proposed by @Jon Stockham (which I think may work but I haven't tested) is it is subject to dirty reads if you have concurrent writers
o

Ovi

01/05/2023, 4:04 PM
Yes, I can have concurrent writers. I'll dig deeper into this. Thank you for the heads-up, Nick!
3 Views