https://delta.io logo
m

Matt Richards

03/01/2023, 3:53 PM
Hi 👋 We are using Delta Lake 1.2 (on EMR) and I'm trying to understand why our largest tables all end up with exactly 1,000 files in S3 and this does not change as we merge in more data. They are of varying sizes (e.g. 1.1 TB / 7B records, 330GB / 1.5B records, 53 GB / 180M records), but all have 1,000 files even though we have not explicitly repartitioned (nor set shuffle partitions). I know delta now does repartition before merge by default, could that have an impact? It looks as if there is a max setting at play somewhere?
s

Scott Sandre (Delta Lake)

03/01/2023, 4:47 PM
This seems odd. Have you double checked this? Where did you see the
1000
number? Is it paging results?
I know delta now does repartition before merge by default, could that have an impact?
No. If the table has 1000 parquet files at time T, and then at time T+1 you perform a merge, then delta will have to re-write some parquet files. so surely there will be more than 1000 parquet files
m

Matt Richards

03/01/2023, 4:49 PM
The Spark UI query plan shows
number of files read: 1000
s

Scott Sandre (Delta Lake)

03/01/2023, 4:50 PM
that's different from
the table contains exactly 1000 files
btw
what query are you running?
what do your merges look like?
m

Matt Richards

03/01/2023, 4:53 PM
I can't paste a screenshot unfortunately... The merges are building a fact table from upstream event table and joining to dimensions, so vary in complexity. The upstream source DF has 1d of new/updated records that merge into the main fact target
the files read is coming from a Scan parquet step that has no filters - that doesn't tell you the number of actual parquet files?
s

Scott Sandre (Delta Lake)

03/01/2023, 5:00 PM
that doesn't tell you the number of actual parquet files?
I'm just trying to highlight that if you insert 1000 files, then "remove" 1000 files, then insert "1000" files, you will have 2000 files in your table, because we tombstone files not immediately remove them. So if you insert 1000 files, then do a merge, we may logically remove 100 files, then insert 100 files, and you will have 1100 files in your table.
but this isn't helpful it seems to you debugging this problem 🙂
if you can recreate this using sample data / sample queries, that would probably be the next best step for me to help you here
m

Matt Richards

03/01/2023, 5:00 PM
🙂
Unfortunately, we don't have a non-prod environment that gets close to the same volumes. The delta metrics show 1000 files added and 1000 files removed. The merge uses the event key - I thought that phase 1 of the merge was an inner join of source to target to identify files that will be modified. We have 1,846,155 rows in source merging to 7,522,723,519, so am puzzled why ends up rewriting the entire table, with no change in file count. I can try to replicate it in dev with scaled volumes
describe detail
tells you the size and number of files in each tables’ current snapshot
👍 1
m

Matt Richards

03/01/2023, 5:55 PM
hmm, just checked a query plan from a dev integration test that is using just a smaller number of records. The merge there shows an exchange for the source side that has 15 shuffle records, data size of 9.6KiB and number of partitions: 1,000 🤔
c

chris fish

03/01/2023, 6:19 PM
sorry so is this the number of shuffle partitions somewhere on your cluster?
can you paste the spark ui screenshot here?
m

Matt Richards

03/01/2023, 6:34 PM
image.png
As far as I can see there's nothing setting shuffle partitions, but it does look like that's the obvious explanation
In prod, the equivalent looks like:
Nothing set in spark properties for shuffle partitions according to UI Environment
c

chris fish

03/01/2023, 8:38 PM
weird, that is very consistently being set to 1000 shuffle partitions
you can set the shuffle partitions during runtime, it doesn’t necessarily show up in the sparkenv UI
m

Matt Richards

03/02/2023, 8:19 AM
@chris fish / @Scott Sandre (Delta Lake) - it looks like EMR has a default setting out of the box of `spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000`: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-performance.html
I noticed that in one query plan there are a couple of shuffles that get partitions = 200, and then later it switches to 1000 with a subsequent AQE coalesce. Is it the case that AQE may not be active at a particular point, in which case we end up with the default shuffle partitions? Do you have any recommendations? Leave that default but explicitly set shuffle partitions when we know we need it; or always be explicit?
c

chris fish

03/02/2023, 6:11 PM
ah yeah so that setting only
coalesces
partitions down. so when you are reading in a lot of data, there might be 10,000 tasks reading in the data, and then it will coalesce down to 1000 tasks during a shuffle. but if the tasks never goes above 1000, then that AQE feature will never kick in
m

Matt Richards

03/02/2023, 6:37 PM
any thoughts on the best approach here?
c

chris fish

03/02/2023, 7:10 PM
it sounds like in EMR you still have to manually control the number of partitions here
you could definitely reach out to an EMR specialist and ask them why AQE isn’t performing better
m

Matt Richards

03/03/2023, 3:26 PM
thanks, I've asked our AWS SA to connect us with a specialist on their side
4 Views