https://delta.io logo
s

Sukumar Nataraj

09/06/2023, 8:56 AM
Hello, Team, We are currently using a one of column as the partition key, which contains over 10k+ unique values. Unfortunately, due to compliance reasons, we cannot change this partition key. Our data lakehouse is built using MySQL CDC, Debezium, and Kafka for incremental data processing, and we're also using Spark Structured Streaming with a 15-minute micro-batch time interval. Our setup will look like, Delta 2.2.0, Spark 3.3.1 and EMR 6.10.0 The challenge we're facing is that when we receive incremental data for partitions with high volumes (up to 1GB), our MERGE operations take significantly longer to complete due to data skewness. Our analysis of MERGE metrics reveals that the scanTime is relatively short, but the rewriteTime is extensive. We've come across a property in the DeltaSQLConf file,
spark.databricks.delta.merge.repartitionBeforeWrite.enabled
, which is set to true by default. Altering this property improved some of our MERGE times, but over time, it increased scanTime due to the proliferation of small files and the associated increase in S3 API costs. But Setting spark.databricks.delta.merge.repartitionBeforeWrite.enabled to true helps mitigate small file issues but can exacerbate executionTime. To address this, we introduced a secondary partitioning scheme using a
hash_value
top of our primary partition column. The
hash_value
ranges from 5 to 20, depending on the table's volume, and is generated using one of the primary key columns. We combined this with repartitionBeforeWrite set to true. This approach serves two main purposes: •
repartitionBeforeWrite
helps us address the issue of too many small files. • Hash partitioning allows us to split large partitions into smaller files. For example:
Copy code
a=1/hash_id=0, a=1/hash_id=1, a=1/hash_id=(n-1)
a=2/hash_id=0, a=2/hash_id=1, a=3/hash_id=(n-1)
a=3/hash_id=0, a=3/hash_id=1, a=3/hash_id=(n-1)
We anticipate the following outcomes: • Increased Read Latency because it now requires scanning inside each hash_value partition. • A potential increase in S3 costs due to small files being split into even smaller files (although, if the volume is low, this may not be a frequent issue). • Slight increases in scanTime and RewriteTime during Writes. While these below proposals from the Delta community may ultimately address our problem, but we're looking for an immediate solution. https://github.com/delta-io/delta/pull/1198 https://github.com/delta-io/delta/issues/500 https://github.com/delta-io/delta/issues/1874 We'd greatly appreciate your opinion on this approach, its downsides, and potential impact. Thanks for your assistance.
7
Can someone help me on this?. Thanks.
g

Ganesh M

09/09/2023, 3:08 PM
Can someone please assist on this ? Much appreciated!