https://delta.io logo
#random
Title
b

Barak Haver

03/08/2023, 12:32 PM
Hi, not exactly a delta question but was hoping to maybe get some input here (couldn’t find reading material about it). I have the following 2 options (as an example), they should be reading the same exact data and writing the same exact data, though one transforms the data outside of the batch and then writes it using the
foreachBatch
method, and the other transform and writes inside the batch function. Are their side effects/issues/things worth taking into consideration? Thanks! Example:
Couldn’t remove the image so putting the code itself here:
Copy code
df_og = spark.readStream.format("delta").load("/some/data/...")
deltaTable = DeltaTable.forPath(spark, "/some/path/to_write_to")

def transformation_handler(df):
  transformed_df = df  # Some transformations here...
  return transformed_df

# Option 1:
df = df_og.transform(transformation_handler)
df = df.writeStream.format("delta").foreachBatch(transform_and_upsertToDelta)...

def transform_and_upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t")
     .merge(microBatchOutputDF.alias("s"), "s.key = t.key")
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute())

# Option 2:
df_og.writeStream.format("delta").foreachBatch(transform_and_upsertToDelta)...

def transform_and_upsertToDelta(microBatchOutputDF, batchId):
  transformed_df = microBatchOutputDF.transform(transformation_handler)
  (deltaTable.alias("t")
     .merge(transformed_df.alias("s"), "s.key = t.key")
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute())
g

Gal Stainfeld

03/26/2023, 8:16 AM
In general, the operations that are allowed on a streaming DF are a subset of the operations allowed on a regular one (like in the foreachBatch). If you want to apply sorting, aggregations without watermarking, joins without watermarking and some types of window functions - those won’t be supported in the streaming Dataframes transformations. I would say that for things that are not the above, and acceptable in both, stuff like converting timestamp formats, adding methods, regular when-otherwise logic, etc - If one of the two below are important to you then it’s better to apply them on the streaming dataframe: 1. If you fear the the transformation logic is complex and time consuming when applied on a micro batch (accumulated events ) rather then applying it on events straight ahead which is considered faster. 2. If there are any changes to the transformations logic, they can be applied to new micro-batches without needing to restart the streaming app. This is relevant only if using some kind of rolling deployment (or blue-green deployment) that allows you “switch” traffic without restarting. Hope it answers your questions, if not let’s talk in private.
5 Views