Roel Knitel
06/27/2023, 9:43 AMdef upsert_data(microBatchOutputDF, batchId):
deltaTable = DeltaTable.forPath(spark, "<some-abfss-path-silver>")
( deltaTable.alias("data")
.merge(
df.alias("newData"),
"newData.entryNo = data.entryNo")
.whenMatchedUpdateAll()
.whenNotMatchedInstertAll()
.execute() )
def autoload(datasource, source_format, table_name, datatarget):
query = (spark.readStream
.format('cloudFiles')
.option('cloudFiles.format', source_format)
.option('inferSchema', 'True')
.option('cloudFiles.schemaLocation', datasource+'/_checkpoint/')
.load(datasource)
.writeStream
.trigger(once=True)
.foreachBatch(upsert_data)
.format('delta')
.outputMode('update')
.queryName("data incrementeeel laden van brons naar zilver")
.option('checkpointLocation', datasource+'/_checkpoint/')
.start(datatarget)
)
return query
autoload("<some-abfss-path-bronze>" , "csv", "Entries", "<some-abfss-path-silver>")Dominique Brezinski
06/27/2023, 6:55 PMRoel Knitel
06/28/2023, 6:33 AM