Ans Fida
01/30/2023, 6:57 PMScott Sandre (Delta Lake)
01/30/2023, 9:23 PMKrzysztof Chmielewski
01/30/2023, 9:34 PMDoes Delta connector provide a checkpointing mechanism?Yes, Delta Flink Source connector participates in Flink checkpoint mechanism for both bounded and continuous sources.
I want to specify the time stamp or version that I should use to resume processing data from the Delta connector.You don't need to do that. Delta connector will use its state, that was checkpointed by Flink to recover job from last know checkpoint automatically. For Bounded Delta source (batch jobs), during Job recovery, connector will "know" from its checkpointed state what was the Delta snapshot version that was originally processed and which files were read already. It will skip those already processed files and resume reading from those that still needs to be processed. For Continuous source (streaming jobs) the process is similar. Connector will resume from last checkpointed Delta version + will skip already processed files for this version. Please let me know in case further questions.
end to end exactly once guarantee.
Ans Fida
01/30/2023, 9:41 PMPlease keep in mind that files/delta versions that were processed between checkpoints will be reprocessed again after recovery.FC: Flink checkpoint & DV: Delta version. So, if FC1 corresponds to DV10, and later after some time has passed and we’re at DV18 but Flink has not checkpointed yet again and the job failed, on recovery, delta will resume from DV10, correct @Krzysztof Chmielewski?
Krzysztof Chmielewski
01/30/2023, 10:23 PMonce Flink checkpoints, Delta flink connector will make a corresponding checkpoint on it’s side and later if the application was terminated and has to be restarted without the Flink checkpointLet me clarify. Delta connector (source and sink) does not do any checkpoints on its own. It simply participates in Flink checkpoint mechanism, meaning Delta Source's inner state will be included in Flink's state checkpoint. For streaming jobs, in order to make this work your Flink job has to have checkpoints enabled [1]. Only then your job will resume processing like I described in my previous message. Without enabling Flink Checkpoint mechanism, for streaming jobs, Delta Source will read entire data again. For Batch jobs the case is slightly different. For Batch jobs recovery, Flink does not use checkpoints but still persists operator state. In this case no extra configuration is needed.
FC: Flink checkpoint & DV: Delta version. So, if FC1 corresponds to DV10, and later after some time has passed and we’re at DV18 but Flink has not checkpointed yet again and the job failed, on recovery, delta will resume from DV10, correctYes this is correct. Thats why Flink's checkpoint time interval should be tuned to mitigate any delays caused by reprocessing data after recovery. You will have very similar thing for Kafka Source, where kafka offset is committed by Flink Kafka connector back to the Kafka broker during Flink checkpoint. So after recovery Kafka resumes from last know offset, in our case - last know version. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/
Ans Fida
01/30/2023, 10:30 PMstartingVersion
on Delta connector. Any ideas on how we can get that version number if we don’t want to resume from the Flink checkpoint?Krzysztof Chmielewski
01/30/2023, 10:35 PMwe need to run the job with a new SQL and operator orderahh SQL... 🙂 I know that "problem:, its that case when Job plan for SQL has changed and Flink cannot recover the state right? BTW, you are mixing Table and DataStream API right? Im asking because SQL support for Delta connector is under development right now.
Ans Fida
01/30/2023, 10:39 PMKrzysztof Chmielewski
01/30/2023, 10:52 PMflink had last checkpointed by specifying theyes, Delta Source Builder has API to specify Starting version. Just you do not have it alraedy, please look for "startingVersion" under this link https://delta-io.github.io/connectors/latest/delta-flink/api/java/index.html But still you would have to "extract" the version number from checkpoint and I guess this part is tricky. I dont think we log delta version or file names in logs during checkpoint. BTW loging file names could flood the logs since there might be a tons of files. Also login the current version also might be tricky, since reading is a distributed task between Task and Job manager. I know that there is Flink State processor API [1] I was always curies if it could be used for SQL state migration. I always wanted to do a PoC for this. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/on Delta connector.startingVersion
Also login the current version also might be tricky, since reading is a distributed task between Task and Job manager.My point was that even some readers can still process data from Delta Version 10, other readers can already be processing version 11 and this is could be last checkpointed version. In case of DataStream jobs its not a problem, because Reader state is also checkpointed. So after recovery, flink would resumed from delta version 11, but state from that one reader will also be recovered -> data from Delta Version 10. In case you would like to manually set "startingVersion" based on version from source state extracted using State processor API, you might miss data from readers, late readers. So using version 11 in my example would not guarantee that you will have all the data. Thats tricky
Ans Fida
01/30/2023, 11:18 PMKrzysztof Chmielewski
01/31/2023, 1:23 PMKenny Ma
01/31/2023, 6:59 PMKrzysztof Chmielewski
01/31/2023, 8:22 PMKenny Ma
01/31/2023, 9:33 PMKrzysztof Chmielewski
01/31/2023, 10:00 PMThis is not a problem for some of our jobs that consume from Kafka because Kafka is also maintaining the state (committed offset) for us so starting a job without checkpoint can still resume from the last committed offset.yeah this also happens during Flink checkpoint. Kafka Source trigger offset commit during Flink checkpoint, but I guess the difference is that exact offset value is maintained on kafka broker side. So whenever you have restart, Kafka gives you whatever data above that last committed offset.
Kenny Ma
01/31/2023, 10:27 PMAns Fida
01/31/2023, 10:33 PMKrzysztof Chmielewski
02/01/2023, 5:58 PMKenny Ma
02/10/2023, 6:02 PMSavepointReader savepoint = SavepointReader.read(streamExecutionEnvironment, "/checkpoint-location", new HashMapStateBackend());
but getting this error when trying to read it The implementation of the RichParallelSourceFunction is not serializable. The object probably contains or references non serializable fields.
DataStream<KeyedState> keyedState = savepoint.readKeyedState("delta-source", new ReaderFunction());
class KeyedState {
public int key;
public int value;
public List<Long> times;
}
class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> {
ValueState<Integer> state;
ListState<Long> updateTimes;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", <http://Types.INT|Types.INT>);
state = getRuntimeContext().getState(stateDescriptor);
ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
updateTimes = getRuntimeContext().getListState(updateDescriptor);
}
@Override
public void readKey(
Integer key,
Context ctx,
Collector<KeyedState> out) throws Exception {
KeyedState data = new KeyedState();
data.key = key;
data.value = state.value();
data.times = StreamSupport
.stream(updateTimes.get().spliterator(), false)
.collect(Collectors.toList());
out.collect(data);
System.out.println(out.toString());
}
}
Krzysztof Chmielewski
02/10/2023, 6:09 PMsavepoint.readKeyedState
is the correct API to use here.
Delta Sink does not use Keyed state it has more like "operator state"Kenny Ma
02/10/2023, 6:20 PMsavepoint.readKeyedState
is the correct way to extract the values. I’ve also tried readListState
, readUnionState
, and print
but not having luck with getting/printing the values.Krzysztof Chmielewski
02/10/2023, 6:24 PMKenny Ma
02/10/2023, 6:29 PMKrzysztof Chmielewski
02/10/2023, 7:41 PMAns Fida
02/10/2023, 7:41 PMKrzysztof Chmielewski
02/10/2023, 7:43 PMAns Fida
02/10/2023, 7:44 PMKenny Ma
02/10/2023, 8:00 PMKrzysztof Chmielewski
02/10/2023, 8:46 PMenv
.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "Processing pipeline Delta Source")
.setParallelism(1)
.name("myStreamingDeltaSource")
.uid("myStreamingDeltaSourceUid") <- this is the UID we will need later
.sinkTo(secondDeltaSink)
I'm attaching the source code (unit test) that deserialize DeltaEnumeratorStateCheckpoint from Flink checkpoint.
In that code you need to change path to checkpoint (line 24, remember to include chk- folder)
and you would need to replace uid to one that was used for your source (line 43).
Test will print Delta Checkpoint and path to not assigned Splits.
I'm thinking that It would also be good to restore state of Readers because those also might have splits from older versions.
package org.example.streamprocessor;
import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpoint;
import io.delta.flink.source.internal.state.DeltaPendingSplitsCheckpointSerializer;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import io.delta.flink.source.internal.state.DeltaSourceSplitSerializer;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.junit.jupiter.api.Test;
public class CheckpointReadTest {
@Test
public void testDeserializeEnumState() throws IOException {
File resourcesDirectory = new File("src/test/resources/e920a0830ddb1308aa5f2b03b94a3a72/chk-165");
CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata( resourcesDirectory.getAbsolutePath());
int maxParallelism =
metadata.getOperatorStates().stream()
.map(OperatorState::getMaxParallelism)
.max(Comparator.naturalOrder())
.orElseThrow(
() ->
new RuntimeException(
"Savepoint must contain at least one operator state."));
SavepointMetadataV2 savepointMetadata =
new SavepointMetadataV2(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
// change uid here
byte[] data =
savepointMetadata.getOperatorState("myStreamingDeltaSourceUid").getCoordinatorState()
.getData();
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream in = new DataInputViewStreamWrapper(bais)) {
final int coordinatorSerdeVersion = readAndVerifyCoordinatorSerdeVersion(in);
int enumSerializerVersion = in.readInt();
int serializedEnumChkptSize = in.readInt();
byte[] serializedEnumChkpt = readBytes(in, serializedEnumChkptSize);
if (coordinatorSerdeVersion != SourceCoordinatorSerdeUtils.VERSION_0
&& bais.available() > 0) {
throw new IOException("Unexpected trailing bytes in enumerator checkpoint data");
}
DeltaPendingSplitsCheckpointSerializer<DeltaSourceSplit> des = new DeltaPendingSplitsCheckpointSerializer<>(
DeltaSourceSplitSerializer.INSTANCE);
DeltaEnumeratorStateCheckpoint<DeltaSourceSplit> deserializeEnumeratorState =
des.deserialize(enumSerializerVersion, serializedEnumChkpt);
System.out.println("Delta Snapshot Version: " + deserializeEnumeratorState.getSnapshotVersion());
// Splits that were not yet assigned to readers.
for (DeltaSourceSplit split : deserializeEnumeratorState.getSplits()) {
System.out.println(split.path());
}
}
}
static byte[] readBytes(DataInputStream in, int size) throws IOException {
byte[] bytes = new byte[size];
in.readFully(bytes);
return bytes;
}
static int readAndVerifyCoordinatorSerdeVersion(DataInputStream in) throws IOException {
int version = in.readInt();
if (version > 1) {
throw new IOException("Unsupported source coordinator serde version " + version);
}
return version;
}
}
Kenny Ma
02/10/2023, 9:11 PM