https://delta.io logo
a

Ajex

03/16/2023, 6:47 AM
Hi team, I am using delta 2.1.1 to perform optimization with z-order. This is my function:
Copy code
def zorder(spark: SparkSession, opts: AppOptions): Unit = {
    val path = opts.getString("path")
    val partitionCondition = opts.getString("partition_condition")
    val optimizeCols = opts.getString("optimize_cols").split(",").map(_.trim)

    val table = DeltaTable.forPath(spark, path)
    table.optimize()
      .where(partitionCondition)
      .executeZOrderBy(optimizeCols: _*)

    spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", false)
    spark.sql(s"ALTER TABLE delta.`${path}` SET TBLPROPERTIES('delta.deletedFileRetentionDuration' = 'interval 1 minute')")
    spark.sql(s"ALTER TABLE delta.`${path}` SET TBLPROPERTIES('delta.logRetentionDuration' = 'interval 1 day')")
    Thread.sleep(90000) // sleep 1.5 minutes before vacuum
    table.vacuum()
  }
The problem is that the table I need to optimize has a lot of small files, around 400 files due to a batch job that writes every 15 minutes and then partitions by 3 (having 3 sources). When I try to optimize with z-order, even uses a lot of resources and I still get the following issue: "Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 15 (run at ForkJoinTask.java:1402) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 2 partition 0".
full error
j

JosephK (exDatabricks)

03/16/2023, 10:56 AM
You can try to optimize it by partition
k

Kashyap Bhatt

03/16/2023, 2:15 PM
We use
delta.autoOptimize.autoCompact
for our tables, so although we have lots of small files to begin with (we also have data coming in every 15 mins), they keep getting compacted automatically (seen in table history) as part of upserts periodically.
Not at all sure if it'll help your situation, but worth a try.
It's equivalent of running
OPTIMIZE <table>
every now and then. So to check if it'll help you can run
OPTMIZE
on your table before you run
OPTIMIZE <table> ZORDER BY <col1>
.
a

Ajex

03/16/2023, 2:22 PM
@JosephK (exDatabricks) my table partition by date, about 100mill per source per day(300mill/day) and optimize with date condition(partition by date)
@Kashyap Bhatt Thank you, I will try the solution you suggested. But I don't see this config(
delta.autoOptimize.autoCompact
) mentioned in the docs of Delta, Delta OSS.
k

Kashyap Bhatt

03/16/2023, 3:02 PM
hmm.. I think you're right. This might be a databricks runtime feature not available in Delta OSS.
hmm.. I think you're right. This might be a databricks runtime feature not available in Delta OSS.
as I mentioned though, AFAIK, it's same as running OPTMIZE table periodically. In your case running it before running ZORDER. Though in theory
OPTIMIZE <table> ZORDER BY <col1>
should compact files and then do ZORDER.
a

Ajex

03/16/2023, 3:20 PM
I understand that your solution will work well, but how do I deal with this messy bundle of numerous small files that have already been written out?
k

Kashyap Bhatt

03/16/2023, 3:24 PM
I understand that your solution will work
Thanks, you have more confidence than I do. 🙂
how do I deal with this messy bundle of numerous small files that have already been written out
OPTMIZE <table>
will combine "numerous small files" into "few big files" (based on your partitioning columns etc).
🙌 1
a

Ajex

03/16/2023, 3:35 PM
i will try your solution, thank you.
3 Views