Szymon Sikora
08/01/2023, 6:19 PMjoin_condition = f"target.category_id = '{category_id}' AND target.checksum == source.checksum"
delete_condition = f"target.category_id = '{category_id}"
update_condition = "target.uuid <> source.uuid"
update_set = {"column_1": "source.column_1", "column_2": "source.column_2"}
merge_statement = (
target_delta_table.alias("target")
.merge(df.alias("source"), join_condition)
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete(condition=delete_condition)
.whenMatchedUpdate(condition=update_condition, set=update_set)
)
category_id is a unique value injected by each process. I've followed the guide from https://docs.delta.io/latest/concurrency-control.html#avoid-conflicts-using-partitioning-and-disjoint-command-conditions and explicitly stated my partition (_category_id_) in merge join condition. Am i missing something here? How can i inform delta lake api that this operation in bound to selected partitionDominique Brezinski
08/01/2023, 6:52 PM.wheenNotMatchedInsertAll()
is not partition disjoint from the other concurrent operations, so you are getting the concurrent append exception.Szymon Sikora
08/01/2023, 9:02 PM