https://delta.io logo
a

Athi

01/13/2023, 12:46 AM
Hello folks, need your thoughts on how to resolve. my driver code initiates multiple write streams(like in below command). the code exits after all write streams initialize, does not wait for completion. is there a way to block further execution in driver script until all the streams completed?
Copy code
for i in list:
  sq = events.writeStream
              .trigger(Trigger.Once)
              .format("delta")
              .outputMode("append")
              .option("checkpointLocation", "/tmp/delta/events/_checkpoints/"+i)
              .start(output_path)
j

Jon Stockham

01/13/2023, 9:19 AM
Copy code
streams = []

for i in list:
  sq = events.writeStream
              .trigger(Trigger.Once)
              .format("delta")
              .outputMode("append")
              .option("checkpointLocation", "/tmp/delta/events/_checkpoints/"+i)
              .start(output_path)

  streams.append(sq)

for s in streams:
  s.awaitTermination()
something like this should do the trick
👍 1
k

Kashyap Bhatt

01/13/2023, 2:26 PM
Your code might exit, but the job/notebook would await termination of all pipelines irrespective of trigger (Once or AvailableNow).
👍 1
a

Athi

01/13/2023, 2:27 PM
That's what happened.
k

Kashyap Bhatt

01/13/2023, 2:28 PM
So you might be able to fix this in either by doing what Jon suggested, or change your orchestration logic so it doesn't start a new job until old job is complete.
👍 1
a

Athi

01/13/2023, 2:28 PM
Sure. let me try what Jon suggested todaty
10 Views