https://delta.io logo
m

Markus Hube

09/12/2023, 1:10 PM
Hi all I am facing a situation where I am using
df.writeStream.foreachBatch(...)
to merge against a delta table. the merge is done on a uid column and an hour partition column. But when my data growths in size the query is failing due to
spark.driver.maxResultSize
. _I_f I understand the spark plan correctly (see down below) there is a broadcast join going. It clearly makes sense to have have merges without shuffles but is there a way bypassing the driver since this would not scale well? Thanks
Copy code
== Physical Plan ==
AdaptiveSparkPlan (30)
+- == Current Plan ==
   HashAggregate (19)
   +- Exchange (18)
      +- HashAggregate (17)
         +- Filter (16)
            +- HashAggregate (15)
               +- Exchange (14)
                  +- HashAggregate (13)
                     +- Project (12)
                        +- BroadcastHashJoin Inner BuildLeft (11)
                           :- BroadcastQueryStage (6)
                           :  +- BroadcastExchange (5)
                           :     +- * Project (4)
                           :        +- * Filter (3)
                           :           +- * Filter (2)
                           :              +- * Scan ExistingRDD mergeMaterializedSource (1)
                           +- Filter (10)
                              +- Project (9)
                                 +- Project (8)
                                    +- Scan parquet  (7)
Looking further I found this stack overflow post which seams to describe the same behavior