https://delta.io logo
a

Ans Fida

01/30/2023, 6:57 PM
Does Delta connector provide a checkpointing mechanism? I’ve a Flink job that reads from a Delta connector and if the Flink job has to be restarted, I want to specify the time stamp or version that I should use to resume processing data from the Delta connector. Can anyone provide any insights into this?
s

Scott Sandre (Delta Lake)

01/30/2023, 9:23 PM
cc @Krzysztof Chmielewski
k

Krzysztof Chmielewski

01/30/2023, 9:34 PM
Hi @Ans Fida
Does Delta connector provide a checkpointing mechanism?
Yes, Delta Flink Source connector participates in Flink checkpoint mechanism for both bounded and continuous sources.
I want to specify the time stamp or version that I should use to resume processing data from the Delta connector.
You don't need to do that. Delta connector will use its state, that was checkpointed by Flink to recover job from last know checkpoint automatically. For Bounded Delta source (batch jobs), during Job recovery, connector will "know" from its checkpointed state what was the Delta snapshot version that was originally processed and which files were read already. It will skip those already processed files and resume reading from those that still needs to be processed. For Continuous source (streaming jobs) the process is similar. Connector will resume from last checkpointed Delta version + will skip already processed files for this version. Please let me know in case further questions.
Please keep in mind that files/delta versions that were processed between checkpoints will be reprocessed again after recovery. It would be good to have a transactional Sink (for example Delta Sink or Kafka Sink) to have
end to end exactly once guarantee.
a

Ans Fida

01/30/2023, 9:41 PM
Thanks for explaining this. Just to double confirm, once Flink checkpoints, Delta flink connector will make a corresponding checkpoint on it’s side and later if the application was terminated and has to be restarted without the Flink checkpoint but still wanting to process data from Delta connector only from where it checkpoint last, it would just automatically do that?
Please keep in mind that files/delta versions that were processed between checkpoints will be reprocessed again after recovery.
FC: Flink checkpoint & DV: Delta version. So, if FC1 corresponds to DV10, and later after some time has passed and we’re at DV18 but Flink has not checkpointed yet again and the job failed, on recovery, delta will resume from DV10, correct @Krzysztof Chmielewski?
k

Krzysztof Chmielewski

01/30/2023, 10:23 PM
once Flink checkpoints, Delta flink connector will make a corresponding checkpoint on it’s side and later if the application was terminated and has to be restarted without the Flink checkpoint
Let me clarify. Delta connector (source and sink) does not do any checkpoints on its own. It simply participates in Flink checkpoint mechanism, meaning Delta Source's inner state will be included in Flink's state checkpoint. For streaming jobs, in order to make this work your Flink job has to have checkpoints enabled [1]. Only then your job will resume processing like I described in my previous message. Without enabling Flink Checkpoint mechanism, for streaming jobs, Delta Source will read entire data again. For Batch jobs the case is slightly different. For Batch jobs recovery, Flink does not use checkpoints but still persists operator state. In this case no extra configuration is needed.
FC: Flink checkpoint & DV: Delta version. So, if FC1 corresponds to DV10, and later after some time has passed and we’re at DV18 but Flink has not checkpointed yet again and the job failed, on recovery, delta will resume from DV10, correct
Yes this is correct. Thats why Flink's checkpoint time interval should be tuned to mitigate any delays caused by reprocessing data after recovery. You will have very similar thing for Kafka Source, where kafka offset is committed by Flink Kafka connector back to the Kafka broker during Flink checkpoint. So after recovery Kafka resumes from last know offset, in our case - last know version. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/
a

Ans Fida

01/30/2023, 10:30 PM
I see, we do have flink checkpointing enabled but there are some cases where we can’t use those checkpoints (e.g., we need to run the job with a new SQL and operator order). However, In that case, we’d still like to still process data from the version where Flink had last checkpointed by specifying the
startingVersion
on Delta connector. Any ideas on how we can get that version number if we don’t want to resume from the Flink checkpoint?
k

Krzysztof Chmielewski

01/30/2023, 10:35 PM
we need to run the job with a new SQL and operator order
ahh SQL... 🙂 I know that "problem:, its that case when Job plan for SQL has changed and Flink cannot recover the state right? BTW, you are mixing Table and DataStream API right? Im asking because SQL support for Delta connector is under development right now.
a

Ans Fida

01/30/2023, 10:39 PM
yep, you got it. We’re mixing Table and DataStream for our usage
Alternatively, while flink is reading from delta connector, is there a way to know what files are being read? We can use that information to traceback the version that we’d need later.
k

Krzysztof Chmielewski

01/30/2023, 10:52 PM
flink had last checkpointed by specifying the
startingVersion
on Delta connector.
yes, Delta Source Builder has API to specify Starting version. Just you do not have it alraedy, please look for "startingVersion" under this link https://delta-io.github.io/connectors/latest/delta-flink/api/java/index.html But still you would have to "extract" the version number from checkpoint and I guess this part is tricky. I dont think we log delta version or file names in logs during checkpoint. BTW loging file names could flood the logs since there might be a tons of files. Also login the current version also might be tricky, since reading is a distributed task between Task and Job manager. I know that there is Flink State processor API [1] I was always curies if it could be used for SQL state migration. I always wanted to do a PoC for this. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
in theory you should be able to extract Source state -> version number using this, but i've never tried that.
regarding my
Also login the current version also might be tricky, since reading is a distributed task between Task and Job manager.
My point was that even some readers can still process data from Delta Version 10, other readers can already be processing version 11 and this is could be last checkpointed version. In case of DataStream jobs its not a problem, because Reader state is also checkpointed. So after recovery, flink would resumed from delta version 11, but state from that one reader will also be recovered -> data from Delta Version 10. In case you would like to manually set "startingVersion" based on version from source state extracted using State processor API, you might miss data from readers, late readers. So using version 11 in my example would not guarantee that you will have all the data. Thats tricky
a

Ans Fida

01/30/2023, 11:18 PM
Got it, yep I was looking in to Flink state processor API but the documentation and usability is limited as it’s primarily for internal usage.
Thanks for the help, I’ll keep looking into the Flink state processor api to extract the version no (or timestamp)
Btw, when the flink job is running with parallelism, how does Delta connector track the versions/files across operators?
k

Krzysztof Chmielewski

01/31/2023, 1:23 PM
Delta Source is implemented using Unified source API [1]. The key components are: Split enumerator - always a single instance per job, runs on Job Manager Source Readers - number of instances = source parallelism level, runs on Task Managers Split objects - information about what/which file to read from. Enumerator reads Delta Log and based on Delta's AddFile Actions creates Split objects that are stored in enumerator's state and they are later send one by one to Source readers. Enumerator maintains the list of unassigned splits and current snapshot version - DeltaEnumeratorStateCheckpoint.java This information is included in Flink's checkpoint and since there is only one instance of Split Enumerator despite the parallelism level. However like I said, enumerator maintains also list of unassigned splits that are representing files that are not yet assigned to readers. Those files can be from lower version that current snapshotVersion checkpointed from enumerator. Maybe it would be possible to use State Processor API to scan pendingSplitsCheckpoint list from DeltaEnumeratorStateCheckpoint also. However DeltaSplit object does not contain information about snapshot version. It has path to underlying parquet file though. I think it would be possible to tell to which delta version particular parquet file belongs to. A cumbersome task.. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
k

Kenny Ma

01/31/2023, 6:59 PM
Hi @Krzysztof Chmielewski - I work with @Ans Fida. Thanks for all the great advice and explaining the internal working of the Flink Delta connector. This is my understanding: • We cannot use the snapshotVersion from the checkpoint because there could be unassigned splits belonging to lower version. • We will need to get the unassigned splits from the checkpoint state, somehow get the version of the unassigned splits and use the min(unassigned_splits_version, snapshotVersion) to resume processing in the new job started w/o checkpoint.
k

Krzysztof Chmielewski

01/31/2023, 8:22 PM
Hi @Kenny Ma, yes your summary is accurate. But please let me highlight that what we are discussing here is a workaround for Flink known limitation, that state migration is not possible for SQL jobs when execution plan changed between checkpoints/savepoints. And using State processor API for this is rather unknown territory at least for me. I would not be something I can recommend but rather something to try. I also wonder is there any other thing we could do. For example what is the reason that execution plan have changed? The business logic have changed and you simply submitting new job using previous checkpoint? Is switching to pure DataStream API an option here? If not maybe in that case you could use some old Delta version, that you know that was already processed. Maybe business logic can accept some "controlled" amount of data that was already processed and you could use "startingTimestamp" option and try to set timestamp that would be 6h, 12h before restart?
k

Kenny Ma

01/31/2023, 9:33 PM
Hi @Krzysztof Chmielewski - I would prefer Flink to fix this limitation so we don’t have to try this hack. This is not a problem for some of our jobs that consume from Kafka because Kafka is also maintaining the state (committed offset) for us so starting a job without checkpoint can still resume from the last committed offset. The reason we run into this situation is the business logic is in the sql queries and the logic is updated often. Migrating to DataStream API at this point is not an option for us. We do not guarantee exactly-once processing and re-processing old data is acceptable. Like you said we can start processing from n versions before snapshotVersion or x hours before the Split object timestamp. If we want to avoid missing/skipping data we will need to be aggressive with the data re-processing but the trade-off is unnecessary latency increase.
👍 1
k

Krzysztof Chmielewski

01/31/2023, 10:00 PM
This is not a problem for some of our jobs that consume from Kafka because Kafka is also maintaining the state (committed offset) for us so starting a job without checkpoint can still resume from the last committed offset.
yeah this also happens during Flink checkpoint. Kafka Source trigger offset commit during Flink checkpoint, but I guess the difference is that exact offset value is maintained on kafka broker side. So whenever you have restart, Kafka gives you whatever data above that last committed offset.
k

Kenny Ma

01/31/2023, 10:27 PM
Exactly! The committed offset is tracked in the Flink checkpoint and Kafka broker.
a

Ans Fida

01/31/2023, 10:33 PM
Does Delta have any plans of adding similar functionality? I can see it benefiting cases as ours.
k

Krzysztof Chmielewski

02/01/2023, 5:58 PM
From my perspective I don't think it could be easily added to delta standalone library that connector is using to read/write delta or to the Delta protocol. I guess @Scott Sandre (Delta Lake) would be the best person to ask here.
k

Kenny Ma

02/10/2023, 6:02 PM
@Krzysztof Chmielewski Wonder if you have tried loading and extracting the DeltaSource state values from the Flink savepoint/checkpoint using State Processor API. I ran into serialization error trying to read the DeltaSource state. This is what I do to load the checkpoint from Java (no issue)
Copy code
SavepointReader savepoint = SavepointReader.read(streamExecutionEnvironment, "/checkpoint-location", new HashMapStateBackend());
but getting this error when trying to read it
The implementation of the RichParallelSourceFunction is not serializable. The object probably contains or references non serializable fields.
Copy code
DataStream<KeyedState> keyedState = savepoint.readKeyedState("delta-source", new ReaderFunction());
Copy code
class KeyedState {
    public int key;

    public int value;

    public List<Long> times;
}

class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> {

    ValueState<Integer> state;

    ListState<Long> updateTimes;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", <http://Types.INT|Types.INT>);
        state = getRuntimeContext().getState(stateDescriptor);

        ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
        updateTimes = getRuntimeContext().getListState(updateDescriptor);
    }

    @Override
    public void readKey(
            Integer key,
            Context ctx,
            Collector<KeyedState> out) throws Exception {

        KeyedState data = new KeyedState();
        data.key = key;
        data.value = state.value();
        data.times = StreamSupport
                .stream(updateTimes.get().spliterator(), false)
                .collect(Collectors.toList());

        out.collect(data);
        System.out.println(out.toString());
    }
}
k

Krzysztof Chmielewski

02/10/2023, 6:09 PM
Hi, thanks for udpate. I have naver play with State Processor API unfortunately. i was describing using this as solution that theoretically could work. Is your job containing other sources than Delta Source?
also I wonder if
savepoint.readKeyedState
is the correct API to use here. Delta Sink does not use Keyed state it has more like "operator state"
k

Kenny Ma

02/10/2023, 6:20 PM
DeltaSource is the only source. Since I am getting the state by uid, it shouldn’t matter if there is other sources. I am not familiar with the State Processor API so not sure if
savepoint.readKeyedState
is the correct way to extract the values. I’ve also tried
readListState
,
readUnionState
, and
print
but not having luck with getting/printing the values.
k

Krzysztof Chmielewski

02/10/2023, 6:24 PM
yeah, all those are rather states that would be used in process function. I'm wondering if State processor API allows for reading operators, state. I will try to play with this a littlebit and let you know.
k

Kenny Ma

02/10/2023, 6:29 PM
Thanks @Krzysztof Chmielewski! Appreciate your feedback and help!
k

Krzysztof Chmielewski

02/10/2023, 7:41 PM
you are using FLink 1.15.x?
a

Ans Fida

02/10/2023, 7:41 PM
on Flink 1.16.0 right now
👍 1
k

Krzysztof Chmielewski

02/10/2023, 7:43 PM
btw i would strongly suggest to move to 1.16.1 there were bugs in Flink's for Sink architecture (Flink base code) that were fixed by us and were relesed in 1.15.3 and 1.16.1 Those bugs were causing data loss for Sink
a

Ans Fida

02/10/2023, 7:44 PM
Oh, yeah it should be straightforward for us to migrate to 1.16.1, thanks for the heads up!
k

Kenny Ma

02/10/2023, 8:00 PM
I am using 1.16.1.
k

Krzysztof Chmielewski

02/10/2023, 8:46 PM
ok @Ans Fida and @Kenny Ma I think I have it. But firsts let me once again say that this is not a recommended way, we are simply facing Flink limitation here that SQL jobs cant be restored from checkpoint if Job plan have changed. However, if we want to play with State Processor API and restore Delta Source Enumerator State from checkpoint this is how you can do it: First you need to make sure that you have assigned an "uid" to Delta Source in DataStream API. I'm not talking about ".name" but ".uid(...)", something like this:
Copy code
env
.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "Processing pipeline Delta Source")
.setParallelism(1)
.name("myStreamingDeltaSource")
.uid("myStreamingDeltaSourceUid") <- this is the UID we will need later
.sinkTo(secondDeltaSink)
I'm attaching the source code (unit test) that deserialize DeltaEnumeratorStateCheckpoint from Flink checkpoint. In that code you need to change path to checkpoint (line 24, remember to include chk- folder) and you would need to replace uid to one that was used for your source (line 43). Test will print Delta Checkpoint and path to not assigned Splits. I'm thinking that It would also be good to restore state of Readers because those also might have splits from older versions.
Copy code
package org.example.streamprocessor;

import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpoint;
import io.delta.flink.source.internal.state.DeltaPendingSplitsCheckpointSerializer;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import io.delta.flink.source.internal.state.DeltaSourceSplitSerializer;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.junit.jupiter.api.Test;

public class CheckpointReadTest {

    @Test
    public void testDeserializeEnumState() throws IOException {
        File resourcesDirectory = new File("src/test/resources/e920a0830ddb1308aa5f2b03b94a3a72/chk-165");

        CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata( resourcesDirectory.getAbsolutePath());

        int maxParallelism =
            metadata.getOperatorStates().stream()
                .map(OperatorState::getMaxParallelism)
                .max(Comparator.naturalOrder())
                .orElseThrow(
                    () ->
                        new RuntimeException(
                            "Savepoint must contain at least one operator state."));

        SavepointMetadataV2 savepointMetadata =
            new SavepointMetadataV2(
                maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());

        // change uid here
        byte[] data =
            savepointMetadata.getOperatorState("myStreamingDeltaSourceUid").getCoordinatorState()
                .getData();


        try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
            DataInputStream in = new DataInputViewStreamWrapper(bais)) {
            final int coordinatorSerdeVersion = readAndVerifyCoordinatorSerdeVersion(in);
            int enumSerializerVersion = in.readInt();
            int serializedEnumChkptSize = in.readInt();
            byte[] serializedEnumChkpt = readBytes(in, serializedEnumChkptSize);

            if (coordinatorSerdeVersion != SourceCoordinatorSerdeUtils.VERSION_0
                && bais.available() > 0) {
                throw new IOException("Unexpected trailing bytes in enumerator checkpoint data");
            }

            DeltaPendingSplitsCheckpointSerializer<DeltaSourceSplit> des = new DeltaPendingSplitsCheckpointSerializer<>(
                DeltaSourceSplitSerializer.INSTANCE);

            DeltaEnumeratorStateCheckpoint<DeltaSourceSplit> deserializeEnumeratorState =
                des.deserialize(enumSerializerVersion, serializedEnumChkpt);

            System.out.println("Delta Snapshot Version: " + deserializeEnumeratorState.getSnapshotVersion());

            // Splits that were not yet assigned to readers.
            for (DeltaSourceSplit split : deserializeEnumeratorState.getSplits()) {
                System.out.println(split.path());
            }
        }
    }

    static byte[] readBytes(DataInputStream in, int size) throws IOException {
        byte[] bytes = new byte[size];
        in.readFully(bytes);
        return bytes;
    }

    static int readAndVerifyCoordinatorSerdeVersion(DataInputStream in) throws IOException {
        int version = in.readInt();
        if (version > 1) {
            throw new IOException("Unsupported source coordinator serde version " + version);
        }
        return version;
    }

}
I took this logic from Flink base code, from checkpoint restore path + some from State Processor API internals
🙌 1
k

Kenny Ma

02/10/2023, 9:11 PM
Awesome! Will give it a try. Thanks @Krzysztof Chmielewski!
It works on my end too!
🎉 2
👍 1
5 Views