Jordan Cuevas
05/04/2023, 2:39 PMJoydeep Banik Roy
05/04/2023, 6:05 PMJordan Cuevas
05/04/2023, 6:09 PMJoydeep Banik Roy
05/05/2023, 10:49 AMJordan Cuevas
05/05/2023, 2:22 PMfinal_table = spark.sql(query_main4).repartition('orderid_group')
final_table.createOrReplaceTempView("final_table")
final_table.cache()
x=final_table.count()
print(f"final_table: {x}; {datetime.now().strftime('%H:%M:%S')}")
orders_part3.unpersist()
#WRITE OUT TO DELTA TABLE
if run_type=='intraday':
#THIS BLOCK HANGS
#'final_table' HAS 80k ROWS
#'dt' IS PARTITIONED BY 'orderid_group'; 'orderid' is a bigint
dt = DeltaTable.forPath(spark, f"s3a://{transformed_delta_path}/{final_table_name}")
(dt.alias("a")
.merge(source=final_table.alias("b"), condition="a.orderid_group=b.orderid_group and a.orderid=b.orderid")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
#THIS BLOCK RUNS DAILY WITHOUT ISSUE
(final_table
.write
.format("delta")
.mode('overwrite')
.option('overwriteSchema', "true")
.partitionBy(partition_columns)
.save(f"s3a://{transformed_delta_path}/{final_table_name}")
)
Joydeep Banik Roy
05/05/2023, 6:51 PMJordan Cuevas
05/05/2023, 6:56 PMJoydeep Banik Roy
05/05/2023, 7:01 PMspark.sql.autoBroadcastJoinThreshold=-1
Jordan Cuevas
05/05/2023, 7:03 PMJoydeep Banik Roy
05/05/2023, 7:03 PMJordan Cuevas
05/05/2023, 8:56 PM