https://delta.io logo
j

Jordan Cuevas

05/04/2023, 2:39 PM
I have a partitioned delta table with about 10 million records, with the largest partition's parquet file coming in at ~170MB. This table gets rebuilt daily on a 15 node AWS glue cluster (spark 3.3.0, delta version 2.2.0) and takes about 10 minutes. Now I'm trying to merge intraday updates into this table using the same glue cluster. There are about 80k updated records being created, and I repartition them using the same partition column as the final delta table before attempting to merge. The problem is that the merge operation is hanging, and I can't figure out why. I've allowed the merge stage to run for 45 minutes before killing it. I've increased the driver memory to 20GB hoping that would solve the issue, but no luck. Has anybody seen this behavior, or have any suggestions on what I can try to make this merge work?
j

Joydeep Banik Roy

05/04/2023, 6:05 PM
It might be an issue related to repartitionBeforeRewrite, check this => https://github.com/delta-io/delta/issues/1474
our jobs would hang/stall due to this issue, see if disabling the property works
j

Jordan Cuevas

05/04/2023, 6:09 PM
thank you, I'll turn off that option in config and test again and report back
Unfortunately that didn't solve the issue. I'm still open to other solutions
j

Joydeep Banik Roy

05/05/2023, 10:49 AM
Can you share the code you are running and the executor spark config?
j

Jordan Cuevas

05/05/2023, 2:22 PM
Here is the relevant snippet of my code. 'final_table' is attempting to merge into the delta table when the job hangs. The dataframe has already been cached by this point and repartitioned to match the partitoning of the target delta table. Normally rewriting the full table of 10M rows takes seconds. I've also attached the spark config. I appreciate the feedback.
Copy code
final_table = spark.sql(query_main4).repartition('orderid_group')
final_table.createOrReplaceTempView("final_table")
final_table.cache()
x=final_table.count()
print(f"final_table: {x}; {datetime.now().strftime('%H:%M:%S')}")
orders_part3.unpersist()


#WRITE OUT TO DELTA TABLE
if run_type=='intraday':
    #THIS BLOCK HANGS
    #'final_table' HAS 80k ROWS
    #'dt' IS PARTITIONED BY 'orderid_group'; 'orderid' is a bigint
    dt = DeltaTable.forPath(spark, f"s3a://{transformed_delta_path}/{final_table_name}")
    (dt.alias("a")
       .merge(source=final_table.alias("b"), condition="a.orderid_group=b.orderid_group and a.orderid=b.orderid")
       .whenMatchedUpdateAll()
       .whenNotMatchedInsertAll()
       .execute()
    )
else:
    #THIS BLOCK RUNS DAILY WITHOUT ISSUE
    (final_table
        .write
        .format("delta")
        .mode('overwrite')
        .option('overwriteSchema', "true")
        .partitionBy(partition_columns)
        .save(f"s3a://{transformed_delta_path}/{final_table_name}")
    )
j

Joydeep Banik Roy

05/05/2023, 6:51 PM
config looks good, what is the size of this final table (source)
another problem that I can think of is if the final_table is too small, spark will invoke a broadcast join instead of a sort merge join
this steps can get stuck sometimes
you can either increase the broadcast threshold or set it to 0 to make spark invoke a sort merge join mandatoritly
can you check the physical plan in the spark DAG
j

Jordan Cuevas

05/05/2023, 6:56 PM
final_table is about 80,000 rows. I'll try changing the broadcast threshold, but would that have any other performance implications or would it only affect this merge?
since this is running in glue I don't have the spark UI enabled currently, but I can turn it on next time I try and see what it shows
j

Joydeep Banik Roy

05/05/2023, 7:01 PM
check if it is broadcast join then try setting
spark.sql.autoBroadcastJoinThreshold=-1
if yes, then this will ensure spark invokes sort merge join
j

Jordan Cuevas

05/05/2023, 7:03 PM
thank you, I'll try that shortly and report back
j

Joydeep Banik Roy

05/05/2023, 7:03 PM
👍 delta io
j

Jordan Cuevas

05/05/2023, 8:56 PM
disabling the broadcast join had no effect. What I ended up doing was creating a dataframe from the delta table, removing the rows that exist in the source table, unioning the source dataframe to this filtered dataframe, and finally overwriting the delta table with this new dataframe. It now takes the same amount of time as my daily job that recreates the full table but I should be able to run it on a smaller cluster.
49 Views