https://delta.io logo
s

Szymon Sikora

08/01/2023, 6:19 PM
Hi all, I keep getting the ConcurrentAppendException for my merge statement. I have a delta table partitioned by column _category_id_ and trying to merge into it data produced by few concurrent processes. Each of the processes transform the data related to separate _category_id_. My merge statement looks like that:
Copy code
join_condition = f"target.category_id = '{category_id}' AND target.checksum == source.checksum"

    delete_condition = f"target.category_id = '{category_id}"

    update_condition = "target.uuid <> source.uuid"
    update_set = {"column_1": "source.column_1", "column_2": "source.column_2"}

    merge_statement = (
        target_delta_table.alias("target")
        .merge(df.alias("source"), join_condition)
        .whenNotMatchedInsertAll()
        .whenNotMatchedBySourceDelete(condition=delete_condition)
        .whenMatchedUpdate(condition=update_condition, set=update_set)
    )
category_id is a unique value injected by each process. I've followed the guide from https://docs.delta.io/latest/concurrency-control.html#avoid-conflicts-using-partitioning-and-disjoint-command-conditions and explicitly stated my partition (_category_id_) in merge join condition. Am i missing something here? How can i inform delta lake api that this operation in bound to selected partition
d

Dominique Brezinski

08/01/2023, 6:52 PM
Yes, the
.wheenNotMatchedInsertAll()
is not partition disjoint from the other concurrent operations, so you are getting the concurrent append exception.
s

Szymon Sikora

08/01/2023, 9:02 PM
Thanks for your answer, is there any known workaround to solve that requirement with delta lake within merge statement? or it requires to handle deletion from source as separate spark action?
After some testing I've discovered that whenNotMatchedBySourceDelete is in fact cousing the exception to be thrown