Samir Vyas06/19/2023, 9:17 AM
Now I want to preserve the partitioning structure after the above operation so I set spark.databricks.delta.merge.repartitionBeforeWrite.enabled=true. When I run the above, I notice a significant amount of shuffle. Apparently, spark reads data from all partitions of p1 which match the passed values, then removes the matching rows using a broadcast join and then performs a repartition->partitionBy before writing to preserve partitioning structure (p1->p2). I feel that we can avoid the shuffle here because merge operation is not changing the partitioning structure. Ie. spark can go to each partition, replace the older file with new file from which ids are deleted and create entries for these new files into delta log. Is there any method or config that I can use to avoid the shuffle in the above operation? I currently use delta lake 0.8.0. Thanks in advance!
val table = DeltaTable.forPath(spark, <path>) // table schema could be like this // id: int, ...some other columns, p1: string, p2: string // here p1 and p2 are partition columns with order p1, p2 val df = <data frame with ids to delete, could be around 10k ids> table .merge( broadcast(df), col(p1).isin(<some p1 values from which I want to delete ids>) && col("id") === df("id") ) .whenMatched() .delete() .execute()
Michael Nacey06/27/2023, 8:35 PM