https://delta.io logo
o

Oliver Angelil

07/29/2023, 6:44 PM
when I run
dt.optimize().executeCompaction()
on a directory with 2411 parquet files (average file size around 150mb) with
OPTIMIZE_TARGET_SIZE
at 268mb, there were only 164 added files with file sizes much closer to 268mb. The other 2K+ files were untouched and remain at ~150mb. Any clues why the vast majority of files could not be compacted? Is this a common experience?
d

Dominique Brezinski

07/29/2023, 7:07 PM
I suspect because 150mb is close enough to the target size, and combining any two makes the new file decently larger than the target. The files that were compacted were probably a bit smaller, so combining 2-3 hit the target size. I suspect if you set the target size to 512-1024MB there would be a substantial difference. Any reason 268MB was selected?
o

Oliver Angelil

07/29/2023, 7:19 PM
Thanks for the tip re target size! 268mb is the default (at least in Databricks). If you have any suggestions for an improved target size let me know. The total table size is 345 GB, I'm not using hive-style partitioning (i.e. subdirectories): rather all parquet files are in 1 directory. This is time series data in an incremental table - new data is appended daily. I'm running this now to see if it'll improve query times
dt.optimize().executeZOrderBy("TimeStamp").executeCompaction()
d

Dominique Brezinski

07/29/2023, 7:46 PM
That should write out files around 268MB, because the z-order is a global reorder, so all the data will get comingled and rewritten.
However, You don't need, nor want, to call
.executeCompaction()
after
.executeZOrdeerBy()
Z-Ordering inherently will compact files
o

Oliver Angelil

07/29/2023, 8:10 PM
Noted - thanks 🙂 btw I am a bit confused by the Databricks recommendation to not partition tables that are less than 1TB. If so, I should
df.coalesce(1)
my entire table? Also, why would the default
OPTIMIZE_TARGET_SIZE
be more than 3 orders of magnitude smaller 🤔
d

Dominique Brezinski

07/29/2023, 8:22 PM
oh, because ideal storage layout of your data involves two things: each parquet file being a size that results in good throughput from the storage system (smaller number of bigger files, but still a good number of files to read each from an executor to get good parallelism) and ordering on columns that have predicates applied to them so the min/max stats on the columns per file have tighter ranges. This allows selective queries to skip reading the files that cannot have answers based on the min/max being out of range. It also allows good throughput from storage when a query involves most or all the table, while at the same time reducing the size of the table log file. The data that Databricks has been able to evaluate shows that partitioning on tables under 1TB tends to create too many smaller files, and that either z-ordering or doing ingestion time clustering results in more optimal file layoutwrt size, number, and useful metadata stats.
🙌 2
o

Oliver Angelil

07/29/2023, 10:34 PM
I will run some tests with different partition sizes to see which gives the best performance. Would z-ordering still make sense on a very high cardinality column like timestamp, or do you think for best performance I should lower the granularity by creating a date column from timestamp?
d

Dominique Brezinski

07/29/2023, 10:36 PM
Z-ordering is specifically for high cardinality columns. Don’t partition.
👍 1
o

Oliver Angelil

07/31/2023, 6:59 PM
I ran a few experiments to test the sensitivity of file sizes on query times (always querying the same 3 days of data), on the same 354GB table with time series data from 2018-2023: • original dataset (file sizes ~250mb): query time = 2 seconds •
dt.optimize().executeZOrderBy("TimeStamp")
with
targetFileSize=1gb
: query time = 15 seconds •
dt.optimize().executeZOrderBy("TimeStamp")
with
targetFileSize=100mb
: query time = 1.2 seconds • just for fun:
repartition(1326)
to shuffle all data: query time= 20 mins 🙃 The trend looks like the smaller the parquet file, the faster the query... I'm not so convinced that having a single parquet file of 354GB would be a good idea 😄
I'm running the query on a single driver node
perhaps if I query more than just 3 days of data (say 1 year) the larger file sizes will be quicker...
d

Dominique Brezinski

07/31/2023, 8:08 PM
Hahhah Nobody said anything about a single 345GB parquet. Yeah so your test is totally skewed to what works best for the type of query you are doing and on a single instance cluster. If your query is selective based on an ordered column, then small files will result in processing the least data on the single node. But when randomize the data into many files, you get what it takes to do a full table scan.
You have to take into account how much parallelism your cluster provides and the total mix of query types. Data layout for highly selective queries can be optimized for at the expensive of write performance. However, doing large range scans or full table analytics will be pathologically bad on such a layout.
o

Oliver Angelil

07/31/2023, 8:12 PM
"Hahhah Nobody said anything about a single 345GB parquet." Oh I think I misunderstood the recommendation in the docs. "Not to partition tables less than 1TB", means not having the hive-style
.partionBy
subdirectory style of storage on disc.. and it doesn't mean all data in 1 parquet file...
I'm learning... 🙂
d

Dominique Brezinski

07/31/2023, 8:12 PM
Ha, yes exactly—no hive-style partitioning
o

Oliver Angelil

07/31/2023, 8:13 PM
That's exactly why I was so dumbstruck by that part of the docs... let's hope other users are not as foolish as me
This new understanding is enlightening!
d

Dominique Brezinski

07/31/2023, 8:23 PM
Partitioning means separating data files by value of a specific column (tuple), so for instance if there is a date column that is a partition, all data files will only include data for the same date values. In delta the partition value, say a specific date in this example, is also stored in the log for each file. When your query references a specific date, the log is processed and only the data files with that matching date value as partition would be read. Partitions are generally used on lower cardinality columns, otherwise the partitions tend to fragment the data in each write into small files, and even with compaction each partition still has only small files in it. If you only ever query a partition at a time (again single date in this example), then this will give you fast result because only a tiny bit of data is read. However, if you have to query a large range of partitions the performance will be very bad due to suboptimal throughput to the storage system having to read many small files.
So in general, it is better to use reasonably large file sizes to achieve good throughput from the storage system, and maximize the utility of min/max column stats by ordering columns that are commonly used to select data. Then the query predicate is evaluated against the min/max stats for each file in the table, and only files where the answer is between the min and max will be read and processed. This is why ingest time clustering is very effective for time series data where ingest time is close to event time, because then the min/max on the timestamp will be tight enough to filter by time effectively.
And don’t artificially restrict the data layout by partitioning.
o

Oliver Angelil

07/31/2023, 8:32 PM
That's a clear explanation. I'm soaking up every word! Thank you for teaching me!
d

Dominique Brezinski

07/31/2023, 8:34 PM
No problem!
o

Oliver Angelil

08/01/2023, 9:30 AM
Even for a query spanning a window of 1 year, the larger file sizes are slower. Here I am comparing the same query between two storage structures for the same table (345 GB)
• The old hive-style (partitioned by Year, Month, Day), file sizes ~150mb. Query took 3 minutes
• The new no-partioned, z-ordered, with file sizes ~260mb. Query took 5 minutes. Hoping for Ingestion-Time clustering to do its thing - using DBR12.2, so it should be!
the query was:
df=spark.read.format('delta').load(...)
(df
.filter(
(F.col('TimeStamp')>'2021-06-19T01:20')
& (F.col('TimeStamp')<= '2022-07-20T01:20'))
).count()
d

Dominique Brezinski

08/01/2023, 1:18 PM
Did you run this on a single node cluster again?
o

Oliver Angelil

08/01/2023, 1:30 PM
Yes. 1 node as in the screenshot I shared yesterday. I get the hint. Thanks!
d

Dominique Brezinski

08/01/2023, 3:17 PM
My point is not that using a single node cluster is "wrong," but rather the results are only valid if you will only be using single node clusters of similar resource type to query the table. You are likely to get different results on say a four node i3.4xlarge cluster for example. This is the challenge with benchmarking as a practice--both the people doing the benchmarking and the consumers of the benchmark have to understand how the conditions of the measurements will affect them and what might be extrapolated safely from the results.
Also if you look at the metrics, in the first query the both the min and med file read sizes are actually bigger than the second, but the number of files read is 50% more. 150MB better correlates to the min file size, whereas in the second set of metrics the min is much smaller. Was the first table compacted or z-ordered? Was the second table compacted or z-ordered?
o

Oliver Angelil

08/01/2023, 8:11 PM
• in the 1st table (hive style), I don't see any indication of compaction or z-ordering from looking at the delta log .json files. I just see "add"s:
{"add":{"path":"SiteNumber=BE01/Year=2023/Month=8/Day=1/part-00004-2dfc8f9a-1f94-4d87-881a-5f970cb00451.c000.snappy.parquet"
. The files are simply written into daily bins. • In the 2nd, I am z-ordering on TimeStamp, with targetFileSize=268mb.
yeah good spot re the min and med files sizes being bigger in the first case... I will continue to experiment. this is fun 😄
@Dominique Brezinski in case you're interested the missing piece of the puzzle was not the optimisation I was trying but rather where i was saving the data! For exactly the same data structure, my query times were slower (almost double the time) for data saved to /dbfs/bronze than for the same data saved to /dbfs/mnt/mip-chocolate! There is something about the disc I/O read/write times for the disc locations with the bucket icons. I don't know why but yeah...