https://delta.io logo
r

Roel Knitel

06/27/2023, 9:43 AM
Trying to incrementally upsert changes into a Delta table using autoloader and foreachBatch as described here: https://docs.databricks.com/structured-streaming/delta-lake.html#upsert-from-streaming-queries-using-foreachbatch. It seems my updated records get appended and not updated whatever I try. Should the inital Delta table be set with certain properties? When I use outputMode('Update') in the writeStream I get Data source com.databricks.sql.transaction.tahoe.sources.DeltaDataSource does not support Update output mode.
def upsert_data(microBatchOutputDF, batchId):
deltaTable = DeltaTable.forPath(spark, "<some-abfss-path-silver>") ( deltaTable.alias("data") .merge( df.alias("newData"), "newData.entryNo = data.entryNo") .whenMatchedUpdateAll() .whenNotMatchedInstertAll() .execute() ) def autoload(datasource, source_format, table_name, datatarget): query = (spark.readStream .format('cloudFiles') .option('cloudFiles.format', source_format) .option('inferSchema', 'True') .option('cloudFiles.schemaLocation', datasource+'/_checkpoint/') .load(datasource) .writeStream .trigger(once=True) .foreachBatch(upsert_data) .format('delta') .outputMode('update') .queryName("data incrementeeel laden van brons naar zilver") .option('checkpointLocation', datasource+'/_checkpoint/') .start(datatarget) ) return query autoload("<some-abfss-path-bronze>" , "csv", "Entries", "<some-abfss-path-silver>")
d

Dominique Brezinski

06/27/2023, 6:55 PM
Using foreachBatch you can ignore outputMode. foreachBatch decouples the dataframe from the stream output. In the code you posted, microBatchOutputDF is not used. Is there missing code where it is transformed/renamed into “df”?
The behavior you describe would happen if your merge predicate is never true.
r

Roel Knitel

06/28/2023, 6:33 AM
Thanks for pointing that out Dominiqe! Totally missed that. Got it working now 🙂 thanks again
👍 1