https://delta.io logo
p

Patrik Ekman

05/11/2023, 7:20 AM
Hello! I am wondering how you guys have handled delta streaming + eventual schema updates (changing column data types). Has your solution been to simply delete checkpoints/create a new checkpoint mirroring the new schema, or have you found some other nice workaround? It's in the classic bronze-silver-gold Azure platform, with delta streaming happening between all the zones. Thank you!
k

Karolina A

05/13/2023, 3:22 PM
This has been a thorn in my side actually. There's no good way that I've found, but the hacky way is as follows: ā€¢ stop any running tasks involving the delta source table and checkpoint directory ā€¢ assert that the checkpoints have fully consumed everything up to now from the delta source table, if not can't migrate because the next step will lead to data loss
šŸ™Œ 1
ā€¢ do the schema migration -- this creates new version of delta table ā€¢ write some commit/offset file pairs in the checkpoint directory to act as if the version after the schema update has also been consumed. This is hacky, I have written some code to write those text files.. didn't find something in pyspark or delta to do it ā€¢ after this is done, restart any tasks. The checkpoints will skip over the schema migration and process microbatches only when new data is added to the table
šŸ™Œ 1
We also have the bronze-silver-gold architecture, so generally we stop the entire pipeline, migrate all tables, write the new checkpoint file pairs and restart the pipeline
šŸ™Œ 1
p

Patrik Ekman

05/15/2023, 7:07 AM
Thank you for answering! What's the motivation for "messing" with the checkpoint files rather than removing them and start a new checkpoint? I suppose it's something about keeping the offset so that you don't have to read everything from start once restarting? šŸ™‚
k

Kashyap Bhatt

05/15/2023, 2:47 PM
I have written some code to write those text files.
@Karolina A, could you share this code?
k

Karolina A

05/17/2023, 6:50 PM
> Thank you for answering! What's the motivation for "messing" with the checkpoint files rather than removing them and start a new checkpoint? I suppose it's something about keeping the offset so that you don't have to read everything from start once restarting?
Yeah, we do incremental processing, so we don't want to reprocess months worth of data again whenever we change the schema slightly
āœ… 1
k

Kashyap Bhatt

05/17/2023, 7:06 PM
any chance you can share that code @Karolina A?
k

Karolina A

05/17/2023, 7:10 PM
No, I can't share as it's not open. But, if you go to checkpoint directories, you will see that the offsets are plain json files which refer to a version that they've processed. This can be changed
šŸ‘šŸ½ 1
šŸ‘ 1
p

Patrik Ekman

06/07/2023, 9:39 AM
Cool, thank you so much Karolina! Are there any considerations on e.g. the reservoirVersion field in the offset file?
k

Karolina A

06/13/2023, 11:07 AM
The reservoir Version is the version of the source delta table up to which the data has been marked as processed by the checkpoints (roughly). I haven't found the spec
I think only if a corresponding commit file exists then you can know that the data has been processed and committed up to he version