https://delta.io logo
g

Gnanasoundari Soundarajan

04/04/2023, 7:56 AM
How to load a set of added files between the versions in deltalake and process?
j

JosephK (exDatabricks)

04/04/2023, 11:24 AM
What are you trying to do? What you're asking is possible since you just need to read the logs for the list of files and do a diff between them.
g

Gnanasoundari Soundarajan

04/04/2023, 11:53 AM
Delta lake contains the sensor data coming from various sensors. I have a scheduled job which will run once in an hour, read delta lake, transform and write to different format for batch processing. I would like to take the files which has been added after my last processing version. so that I can work on the data that has been added after my last batch processing. Will the below code works?
Copy code
val df =  spark.read.format("delta")
  .option("startingVersion", lastProcessedVersion)
  .option("endingVersion", latestVersion)
  .load(path)
j

JosephK (exDatabricks)

04/04/2023, 12:30 PM
So what you're looking for is structured streaming. That will read only new files. If you're on databricks, you can use autoloader too
g

Gnanasoundari Soundarajan

04/04/2023, 1:04 PM
I have tried the above code. But it looks like it is taking the snapshot and processing the same set of data on each run. For example, Run 1 - Processed sensor X data of timestamp Y. Deltalake snapshot version 3. Afterwards, there is no data coming in for sensor X . There are few set of files added for other sensors and version incremented to 10. Run 2 - Process the data from version 4 to 10. Expectation is, it should not process Sensor X data of Timestamp Y as there is no files added between Run1 and Run 2. When I checked, it duplicated the data for Sensor X on each run independent of startingVersion that i have mentioned. Note: I am using "io.delta" %% "delta-core" % "2.2.0"
j

JosephK (exDatabricks)

04/04/2023, 3:01 PM
Yeah, the code doesn't matter because it won't accomplish what you want. You want to use structured streaming
👍 1
4 Views