Ovi
01/05/2023, 1:00 PMreplaceWhere
, because the replace condition is arbitrary and the delta version doesn't support it (and I can't upgrade it).
Also, I can't use merge
because there's no one-to-one mapping between the source data frame and the target delta table.
Any advice is welcome.
Thank you!
PS: I need to wrap these commands (delete and insert) into a transaction, if possible
DeltaTable.forPath(deltaTableLocation).delete(dataCondition)
spark.read
.load(sourceDataLocation)
.filter(dataCondition)
.write
.format("delta")
.option("mergeSchema", "true")
.mode(Overwrite)
.save(deltaTableLocation)
Jon Stockham
01/05/2023, 2:04 PMdataCondition
, as df_new
2. generate a unique key value for each row in df_new
. Perhaps with monotonically_increasing_id()
from pyspark.sql.functions
, but you would need to start from the max value already in your target data.
3. read in your existing data, filtered by dataCondition
, as df_deletes
4. union df_new
and df_deletes
5. delta merge on the unioned df, matching on the generated key column. On match then delete, on no match then insert.
I'm not nearly as experienced as most in here but at the least hopefully I can inspire you to come up with something more efficient.Nick Karpov
01/05/2023, 2:07 PMOvi
01/05/2023, 2:38 PMNick Karpov
01/05/2023, 3:19 PMOvi
01/05/2023, 4:04 PM