https://delta.io logo
k

Kenny Ma

04/07/2023, 12:00 AM
Hi - I am using the Delta connector for Flink to streaming Delta logs. When starting from an old version (>50 versions from the latest), it doesn’t consume any data. It works if the starting version is couple of versions from the latest. Has anyone seen this behavior or suggestions to troubleshoot/fix the issue? cc @Krzysztof Chmielewski
k

Krzysztof Chmielewski

04/07/2023, 12:25 PM
Hi @Kenny Ma i will craft a quick test for timeTravel to -50 version and i will let you know.
Ok @Kenny Ma So I've created a Delta Table using Delta sink that has 80 versions. Every version have 10 records. I used Bounded and continuous Delta Source to time travel to version 10 and to version 1. In all cases I was able to read data from that version. Could you send verify if your table can be read by spark job from the same version that seems to not work for Delta Source? Also could you tell me if you are using Bounded or Continuous Delta Source for Flink. And what Flink version and connector version.
k

Kenny Ma

04/07/2023, 4:27 PM
Hi @Krzysztof Chmielewski, thanks for the quick response as always! Here is our setup: • Flink 1.16.1 and using Delta connector 0.6.0 • We are trying to continuously stream from a Delta table written by Spark. Every version has over 100 parquet files and Spark checkpoints every 10 versions. We have over 4700 versions in the Delta table. When trying to restore from an old version (4600), I don’t see any data consumed by Flink/DeltaSource after waiting for more than 10 minutes. While starting from a more recent version always works. I don’t know if it matters but I see in the Flink log from
io.delta.standalone.internal.DeltaLogImpl
that it is starting and loading the most recent Spark checkpoint despite trying to start from a much older version. I will try to enable debugging and do more troubleshooting our end. Thanks!
s

Scott Sandre (Delta Lake)

04/07/2023, 4:29 PM
Version 0.6.0 is for Flink 1.15.3 not 1.16.1 yet btw How are you specifying the version to start from? ie show the configuration you’re using to create your delta-source
k

Krzysztof Chmielewski

04/07/2023, 4:31 PM
0.6.0 can work with 1.16. We dont ship any flink dep with the connector. If runtime api is compatible thats ok.
And api is compatible
I was testing on 1.16.1 cluster
@Scott Sandre (Delta Lake) the flink version is specified in jobs pom.xml like we have in our examples.
@Scott Sandre (Delta Lake) is there any table version retention/pruning in Delta? So very old versions could be deleted?
s

Scott Sandre (Delta Lake)

04/07/2023, 4:41 PM
The default log rention duration is 30 days. This approximately means you can time travel 30 days back by default. We require a checkpoint to exist before the version you are time travelling to. I’m moreso curious what the configuration property that is being used is here
k

Kenny Ma

04/07/2023, 4:43 PM
This is how we create the DeltaSource and using startingVersion() to travel back:
Copy code
Configuration hadoopConf = new Configuration();
sourceConfig.getHadoopConf().forEach(hadoopConf::setStrings);
DeltaSourceBuilderBase<RowData, ?> deltaSourceBuilder =
        DeltaSource.forContinuousRowData(
                        new Path(sourceConfig.getTablePath()), hadoopConf)
                .updateCheckIntervalMillis(Duration.ofSeconds(10).toMillis());
((RowDataContinuousDeltaSourceBuilder) deltaSourceBuilder).startingVersion(sourceConfig.getStartingVersion());
DeltaSource<RowData> deltaSource = deltaSourceBuilder.build();
It’s not a retention problem because the data is still there and I am trying to travel back to the data created a few hours back.
k

Krzysztof Chmielewski

04/07/2023, 4:46 PM
Ok I will try your setup on my data and I will let you know.
🙏 1
And source config is coming from where? It's pojo like thing?
k

Kenny Ma

04/07/2023, 4:47 PM
yeah pojo thing. You can just hardcode the version.
k

Krzysztof Chmielewski

04/07/2023, 4:49 PM
And when to try to read from the newest version it works right?
k

Kenny Ma

04/07/2023, 4:49 PM
you can skip the hadoop config too. we are using it to set
fs.s3a.aws.credentials.provider
correct newest always works
k

Krzysztof Chmielewski

04/07/2023, 4:51 PM
And it has more than 4k versions? And stop working when you time travel back more than 50 right? So for example 4500?
My test table had 80... ;) could you also check if you can read that version using spark?
k

Kenny Ma

04/07/2023, 4:53 PM
That’s correct. I don’t know if you can produce a Delta Table with Spark with checkpoints and try to read it from Flink. But that’s our setup.
k

Krzysztof Chmielewski

04/07/2023, 4:54 PM
You should. Some of the test tables we have are generated via spark job. But without checkpoints, maybe this is it. I will verify.
k

Kenny Ma

04/07/2023, 4:54 PM
My colleagues don’t seem to have issue with read/write from Spark.
👍 1
k

Krzysztof Chmielewski

04/07/2023, 4:54 PM
I need to go for now but I will let you know.
👍 1
k

Kenny Ma

04/07/2023, 4:55 PM
I was surprised to see the message from
io.delta.standalone.internal.DeltaLogImpl
in the Flink log. Why is it trying to load a Spark checkpoint.
k

Krzysztof Chmielewski

04/07/2023, 4:56 PM
Standalone optimization I guess @Scott Sandre (Delta Lake) can give more insights about that one.
Delta protocol is common fir spark and flink.
k

Kenny Ma

04/07/2023, 4:57 PM
Yeah it’s using the same Delta standalone library so maybe it’s normal and an optimization thing like you said.
s

Scott Sandre (Delta Lake)

04/07/2023, 5:03 PM
Delta checkpoints have nothing to do with spark or Flink, they are part of the delta protocol and the spark-delta connector and Flink-delta connector implement that protocol
👍 1
I’m referring to a delta checkpoint, which is the metadata state of the table saved in a parquet file. This is separate from a Flink streaming checkpoint
k

Kenny Ma

04/12/2023, 10:49 PM
@Krzysztof Chmielewski it seems the longer the lookback/time-travel, the longer it takes for the task mgr to start fetching the splits/data. The job mgr is also trying to fetch data on job submission and has a high memory utilization (10G) compare to KafkaSource. I am trying to understand why the job mgr is fetching the data on job submission.
After I enabled debug, it looks like the job mgr is making a http call for every split. Maybe to verify the split objects before distributing them to the task mgrs. We have 100 splits in a version so this check can take awhile before the job start consuming data. Our watermark is computed using the event timestamp from data. If checkpoint is triggered before there is a watermark, the checkpoint will fail and cause the job to fail after couple of checkpoint failures. The challenge is not knowing how long it will take for the task mgrs to start consuming the data because it depends on the period of the time-travel.
k

Krzysztof Chmielewski

04/13/2023, 3:24 PM
Hi @Kenny Ma first we need to clarify "Split" thing :) Split is a data object used to transfer information from Source Enumerator to Source reader. Its pure Flink thing. So saying that your Delta table has X splits does not really right way to say it since Delta table has no context of Splits. For our connector Split in most cases represents a parquet file but for some file systems, one Parquet file can be divided into multiple splits. So when you wrote:
We have 100 splits in a version
This will confuse Databriks/Delta guys since they dont know what Split is - its Flink specific. Would be better to say "100 parquet files per version". I know the details of the connector so that's why its clear for me. Regarding:
The job mgr is also trying to fetch data on job submission
Yes, during job submission, Delta Source creates what is called DeltaLog instance which has to compute table Snapshot instance -> both types comes from delta standalone library. Unfortunately computing the snapshot can be CPU/memory intensive operation for large tables since delta standalone library has to read all delta log json files and compute overall table snapshot. when you submit the job with Delta source two things are happening. 1. your main method is executed -> your all calling DeltaSourceBuilder which creates DeltaLog.instance (we are using delta log metadata to discover column types etc) 2. Flink is creating a source enumerator and this will also create a DeltaLog/Snapshot instance. Depending on how you are submitting your jobs, both things can take place on a Job Manager. Ideally would be to reuse same DeltaLog instance that was created during job submission by source builder in Source enumerator, but unfortunately DeltaLog is not a serializable object... and we cannot reuse it. currently it must be recreated...
Our watermark is computed using the event timestamp from data. If checkpoint is triggered before there is a watermark, the checkpoint will fail and cause the job to fail after couple of checkpoint failures
Could you clarify why "If checkpoint is triggered before there is a watermark, the checkpoint will fail" is that?
@Kenny Ma ^
Im thinking that we could actually build a custom Flink De/Serializator for DeltaLog instance. I wonder if DeltaLogImpl and Snapshot instances have API that woudl support initialization of the objects from "raw" data.
k

Kenny Ma

04/13/2023, 4:28 PM
Hi @Krzysztof Chmielewski thanks for the clarification on the Splits. Regarding the checkpoint failure, I am noticing checkpoint will stall and timeout unless the task mgrs start consuming data. The trade-off/workaround is to set a higher checkpoint interval to account for the time job mgr requires to rebuild the DeltaLog instance or have a higher checkpoint failure tolerance.
Slightly different topic, is there a way to measure lags (versions behind) for the DeltaSource? For Kafka we use the metric records-lag-max published by the FlinkKafkaConsumer.
k

Krzysztof Chmielewski

04/13/2023, 7:26 PM
@Kenny Ma we dont have such a metric but this would be a greate feature. Could you create github issue for this?
k

Kenny Ma

04/13/2023, 7:58 PM
yes will do!
👍 1
@Krzysztof Chmielewski Opened issue https://github.com/delta-io/connectors/issues/537 for the versions lag metric.
👍 1
k

Krzysztof Chmielewski

04/14/2023, 9:45 AM
Hi @Kenny Ma
I am noticing checkpoint will stall and timeout unless the task mgrs start consuming data.
Well that seems possible since Job Manager is busy with processing DeltaLog in SourceEnumerator. However I would say that Job Manager has a separate thread to process checkpoint logic. Im assuming that checkpoint barrier should be send even if there is no data to process (task manager is waiting for split). SourceEnumerator is "outside" the data path. Splits are not transferred to readers on data path (from what I would expect). So having this, makes me thing that maybe you have to high load (not CPU usage but load), meaning there is so many things to do (many threads on job manager) that they are blocking CPU cores, making other threads to wait on CPU. Its regardless of the fact that Job Manager is busy with computing Delta snapshot -> it still be running on separate thread by flink. Is there any way for you to measure the CPU load on Task Manager node and increase the number of CPUs for that node?
k

Kenny Ma

04/14/2023, 3:28 PM
I can increase the cpu. They are set to 12 for the task mgr and 4 for job mgr. The cpu saturation on the task mgr is low since it hasn’t begin consuming data yet. The job mgr is the one working hard building the DeltaLog instance I am assuming. Once the task mgr starts consuming data, the checkpoint completes within a 10 seconds. I also wonder if checkpoint barrier is not getting created until there is data. Will keep you posted!
@Krzysztof Chmielewski I will try the following settings to increase the hadoop s3 connections pool to see if it will speed up the reads through higher parallelism:
Copy code
s3.connection.maximum: 55
s3.threads.max: 50
also is this delta option/feature available in the Delte connector for Flink?
delta.enableFastS3AListFrom=true
k

Krzysztof Chmielewski

04/14/2023, 4:20 PM
i think this will be available in next delta-storage version currently it is in 2.3.0rc1 If you would like to use it now (unreleased version) you would have to extract delta-storage from delta-standalone and use newer hadoop version - 3.3.1 This hadoop version is important part, without it you will have "illegal access exception" Then you can add " delta.enableFastS3AListFrom" to hadoop configuration object that you are passing to the DeltaSource builder Please note that those things were not yet fully tested with delta connector. I was doing some brief test only a.k.a it was working on my machine.
Copy code
<repository>
 <id>staging-repo</id>
 <url> <https://oss.sonatype.org/content/repositories/iodelta-1066/></url>
</repository>

<dependency>
			<groupId>io.delta</groupId>
			<artifactId>delta-standalone_${scala.main.version}</artifactId>
			<version>${delta.version}</version>
			<exclusions>
				<exclusion>
					<groupId>io.delta</groupId>
					<artifactId>delta-storage</artifactId>
				</exclusion>
			</exclusions>
</dependency>

<dependency>
			<groupId>io.delta</groupId>
			<artifactId>delta-storage</artifactId>
			<version>2.3.0rc1</version>
</dependency>
👍 1
s

Scott Sandre (Delta Lake)

04/14/2023, 4:21 PM
Delta Lake 2.3.0 has been released btw, which includes the delta-storage JAR. https://github.com/delta-io/delta/releases/tag/v2.3.0
k

Krzysztof Chmielewski

04/14/2023, 4:21 PM
so I guess you can skipp that <repository> part then
k

Kenny Ma

04/19/2023, 4:54 PM
Hi @Krzysztof Chmielewski I ran the job with
delta.enableFastS3AListFrom=true
and didn’t find any noticeable improvement in the job submission/initialization time and it is still taking a long time (>10mins) with long lookback/time-travel. In the debug loggings of the job mgr, I noticed there are many calls to the S3, one for each parquet file. I don’t think it’s reading the data but do you know the reason for these calls? Most of the time spent during the job initialization is waiting for these calls to complete before the task mgrs start consuming data. Is there plan to support predicate in the Delta connector? We have a large Delta Table and would like to be able to filter out files/parquets that are not needed by the consumer. Thanks!
k

Krzysztof Chmielewski

04/19/2023, 4:55 PM
@Scott Sandre (Delta Lake) ^
btw this is for Streaming API i believe
👍 1
s

Scott Sandre (Delta Lake)

04/19/2023, 5:01 PM
didn’t find any noticeable improvement in the job submission/initialization time and it is still taking a long time (>10mins) with long lookback/time-travel
this LISTing improvement only improves LISTs. How many log files do you have in your delta table? My benchmark showed that 50_000 delta log files only took 30seconds with the normal list implementation. So, if you're seeing 10+mins initializaiton, that isn't from LISTing.
Is there plan to support predicate in the Delta connector
Please make an issue at delta-io/connectors !
k

Krzysztof Chmielewski

04/19/2023, 5:04 PM
is standalone using parquet files (data files not delta checkpoint) for snapshot initialziation or its based only on log files?
s

Scott Sandre (Delta Lake)

04/19/2023, 5:05 PM
Also, there were two recent performance improvements added to delta-flink + delta-standalone! https://github.com/delta-io/connectors/commit/5759de83f4ba135cd9b25328e8cac6aad02e8f47 https://github.com/delta-io/connectors/commit/f11c355649bf1182dd74b563bc399fa7f1b7fe97 These will be included in v0.7.0 release
k

Krzysztof Chmielewski

04/19/2023, 5:06 PM
The second one can help here since @Kenny Ma is using Delta Source
k

Kenny Ma

04/19/2023, 5:06 PM
I don’t think it’s from listing because the http calls are for individual parquet files.
I can get the debug loggings and paste it here if it helps.
👍 1
k

Krzysztof Chmielewski

04/19/2023, 5:08 PM
http calls are for individual parquet files.
If standalone is not using data parqeut files for snapshot initialziation than that must mean its for readers. Readers got Split from Source enumerator (split has path to parquet file) and they start to read parquet file.
s

Scott Sandre (Delta Lake)

04/19/2023, 5:09 PM
is standalone using parquet files (data files not delta checkpoint) for snapshot initialziation or its based only on log files?
Snapshot initialization does not load parquet (data) files. It just replays the log (JSON, checkpoint) to find the latest metadata and protocol
And this will be significantly improved in v0.7.0
👍 1
k

Kenny Ma

04/19/2023, 6:48 PM
Here is a sample debug log from the job mgr showing call to the Delta table parquet file. During initialization the job mgr is spending most of the time making these calls. Some of the info are redacted for privacy reason.
Copy code
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << HTTP/1.1 200 OK
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << Date: Wed, 19 Apr 2023 18:32:08 GMT
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << Last-Modified: Wed, 19 Apr 2023 07:56:12 GMT
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << Accept-Ranges: bytes
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << Content-Type: application/octet-stream
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << Server: AmazonS3
2023-04-19 18:32:07,486 DEBUG org.apache.http.headers                   [] - http-outgoing-79 << Content-Length: 222051
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.execchain.MainClientExec        [] - Connection can be kept alive for 60000 MILLISECONDS
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection [id: 79][route: {s}->https://{s3-bucket-redacted}:443] can be kept alive for 60.0 seconds
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-79: set socket timeout to 0
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection released: [id: 79][route: {s}->https://{s3-bucket-redacted}:443][total available: 1; route allocated: 1 of 96; total allocated: 1 of 96]
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection request: [route: {s}->https://{s3-bucket-redacted}:443][total available: 1; route allocated: 1 of 96; total allocated: 1 of 96]
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection leased: [id: 79][route: {s}->https://{s3-bucket-redacted}:443][total available: 0; route allocated: 1 of 96; total allocated: 1 of 96]
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-79: set socket timeout to 200000
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-79: set socket timeout to 200000
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.execchain.MainClientExec        [] - Executing request HEAD /{delta-table-parquet-file-s3-prefix-redacted}/part-00078-c0f1f33b-03f9-4245-a08f-2c6bc31de7d3.c000.snappy.parquet HTTP/1.1
14 Views