https://delta.io logo
s

Samir Vyas

06/19/2023, 9:17 AM
Hi everyone, I have a use case where I need to regularly delete rows from a partitioned delta table where id comes from a dataframe. I use the following.
Copy code
val table = DeltaTable.forPath(spark, <path>)

// table schema could be like this
// id: int, ...some other columns, p1: string, p2: string
// here p1 and p2 are partition columns with order p1, p2

val df = <data frame with ids to delete, could be around 10k ids>

table
.merge(
broadcast(df),
col(p1).isin(<some p1 values from which I want to delete ids>) && col("id") === df("id")
)
.whenMatched()
.delete()
.execute()
Now I want to preserve the partitioning structure after the above operation so I set spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true. When I run the above, I notice a significant amount of shuffle. Apparently, spark reads data from all partitions of p1 which match the passed values, then removes the matching rows using a broadcast join and then performs a repartition->partitionBy before writing to preserve partitioning structure (p1->p2). I feel that we can avoid the shuffle here because merge operation is not changing the partitioning structure. Ie. spark can go to each partition, replace the older file with new file from which ids are deleted and create entries for these new files into delta log. Is there any method or config that I can use to avoid the shuffle in the above operation? I currently use delta lake 0.8.0. Thanks in advance!
m

Michael Nacey

06/27/2023, 8:35 PM
I could be wrong, but I think your code is causing a join, which will shuffle. Look at the execution in the spark history server.
2 Views