https://delta.io logo
g

Gautam Venugopal

05/02/2023, 4:09 PM
Hi, I am working on optimizing a Spark Delta Merge batch job using Delta Lake OSS 2.2 on EMR 6.10, and I am looking for some advice on how to tune the write performance. This is a daily batch job where 1 partition in our source table maps to multiple partitions in our target table. I wanted to start with questions on a few properties: 1. spark.databricks.delta.merge.repartitionBeforeWrite.enabled - should this be set to false? When set to true, I noticed it produced 1 large file per partition. 2. spark.databricks.delta.merge.materializeSource - I noticed the default for this was "auto". It seemed to consistently write the source data to disk, which caused much slower run times. I set this to "none" and then it seemed to read off memory. Any issues with this approach? 3. AQE - should we keep this on or off? With spark.databricks.delta.merge.repartitionBeforeWrite.enabled set to false, I noticed that the number of files produced per partition matched spark.sql.adaptive.coalescePartitions.initialPartitionNum (which is 1000 by default). 4. Any recommendations for how we tune spark.sql.shuffle.partitions? When spark.databricks.delta.merge.repartitionBeforeWrite.enabled = false, the value for spark.sql.shuffle.partition seems to map to number of files in each partition. 5. Are there any other properties in 2.2 we should explore that can have benefits on the Delta merge? 6. In our case, a large majority of the source data for the merge, goes to 5 partitions (for the current batch load date) in the target table. Is there any way we can leverage this information to optimize the merge process? @Justin Mark Santilli @Kimberly Mahoney @Shinoy Bhaskaran
👀 3
j

JosephK (exDatabricks)

05/02/2023, 4:59 PM
3. Never turn of AQE
👍 1
a

Adam Binford

05/03/2023, 2:18 AM
FWIW we also had to manually disable materialized source because it was injecting it even though our things were deterministic
👍 1
Also we run with a custom build with the optimized write PR which helps our merges a lot, would be great if that ever got merged. Works like repartition before write by splits big partitions
j

Justin Mark Santilli

05/03/2023, 1:02 PM
@Nick Karpov @TD any timeline on the AutoCompaction PR?
👍 1
g

Gautam Venugopal

05/03/2023, 1:15 PM
@Adam Binford Thanks for sharing. Are you using this PR as your customized build? https://github.com/delta-io/delta/pull/1198
a

Adam Binford

05/03/2023, 10:08 PM
Yep! Works like a charm
2
g

Gautam Venugopal

05/05/2023, 1:30 AM
@Adam Binford Do you mind sharing some of the configs you're using? We thought to give this PR a try, but it seems to be behaving the same with the configs we're using below: spark.databricks.delta.optimizeWrite.enabled=true spark.databricks.delta.merge.repartitionBeforeWrite.enabled=false spark.sql.shuffle.partitions=1000 With these settings, it seems to be generating 1000 files per partition (matching the shuffle partitions).
a

Adam Binford

05/05/2023, 2:49 AM
I can see if any other config jumps out tomorrow. We're only using it for one merge job right now. Mostly using it for appends
g

Gautam Venugopal

05/05/2023, 3:12 PM
Thanks @Adam Binford, I tried a few different variations but for some reason I am not able to get it to work with both appends and merges. Do you happen to be running this on EMR?
a

Adam Binford

05/06/2023, 1:59 PM
No this is on-prem. Didn’t see any other settings standing out, just what you are using. You should be able to see a
OptimizeWriteExchange
in the Job visualization of the write data phase (unfortunately doesn’t show up in the SQL plan with how it gets injected) to see if its actually taking effect or not.
14 Views