Barak Haver
03/08/2023, 12:32 PMforeachBatch
method, and the other transform and writes inside the batch function.
Are their side effects/issues/things worth taking into consideration?
Thanks!
Example:df_og = spark.readStream.format("delta").load("/some/data/...")
deltaTable = DeltaTable.forPath(spark, "/some/path/to_write_to")
def transformation_handler(df):
transformed_df = df # Some transformations here...
return transformed_df
# Option 1:
df = df_og.transform(transformation_handler)
df = df.writeStream.format("delta").foreachBatch(transform_and_upsertToDelta)...
def transform_and_upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t")
.merge(microBatchOutputDF.alias("s"), "s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
# Option 2:
df_og.writeStream.format("delta").foreachBatch(transform_and_upsertToDelta)...
def transform_and_upsertToDelta(microBatchOutputDF, batchId):
transformed_df = microBatchOutputDF.transform(transformation_handler)
(deltaTable.alias("t")
.merge(transformed_df.alias("s"), "s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
Gal Stainfeld
03/26/2023, 8:16 AM