Kenny Ma
04/07/2023, 12:00 AMKrzysztof Chmielewski
04/07/2023, 12:25 PMKenny Ma
04/07/2023, 4:27 PMio.delta.standalone.internal.DeltaLogImpl
that it is starting and loading the most recent Spark checkpoint despite trying to start from a much older version.
I will try to enable debugging and do more troubleshooting our end. Thanks!Scott Sandre (Delta Lake)
04/07/2023, 4:29 PMKrzysztof Chmielewski
04/07/2023, 4:31 PMScott Sandre (Delta Lake)
04/07/2023, 4:41 PMKenny Ma
04/07/2023, 4:43 PMConfiguration hadoopConf = new Configuration();
sourceConfig.getHadoopConf().forEach(hadoopConf::setStrings);
DeltaSourceBuilderBase<RowData, ?> deltaSourceBuilder =
DeltaSource.forContinuousRowData(
new Path(sourceConfig.getTablePath()), hadoopConf)
.updateCheckIntervalMillis(Duration.ofSeconds(10).toMillis());
((RowDataContinuousDeltaSourceBuilder) deltaSourceBuilder).startingVersion(sourceConfig.getStartingVersion());
DeltaSource<RowData> deltaSource = deltaSourceBuilder.build();
Krzysztof Chmielewski
04/07/2023, 4:46 PMKenny Ma
04/07/2023, 4:47 PMKrzysztof Chmielewski
04/07/2023, 4:49 PMKenny Ma
04/07/2023, 4:49 PMfs.s3a.aws.credentials.provider
Krzysztof Chmielewski
04/07/2023, 4:51 PMKenny Ma
04/07/2023, 4:53 PMKrzysztof Chmielewski
04/07/2023, 4:54 PMKenny Ma
04/07/2023, 4:54 PMKrzysztof Chmielewski
04/07/2023, 4:54 PMKenny Ma
04/07/2023, 4:55 PMio.delta.standalone.internal.DeltaLogImpl
in the Flink log. Why is it trying to load a Spark checkpoint.Krzysztof Chmielewski
04/07/2023, 4:56 PMKenny Ma
04/07/2023, 4:57 PMScott Sandre (Delta Lake)
04/07/2023, 5:03 PMKenny Ma
04/12/2023, 10:49 PMKrzysztof Chmielewski
04/13/2023, 3:24 PMWe have 100 splits in a versionThis will confuse Databriks/Delta guys since they dont know what Split is - its Flink specific. Would be better to say "100 parquet files per version". I know the details of the connector so that's why its clear for me. Regarding:
The job mgr is also trying to fetch data on job submissionYes, during job submission, Delta Source creates what is called DeltaLog instance which has to compute table Snapshot instance -> both types comes from delta standalone library. Unfortunately computing the snapshot can be CPU/memory intensive operation for large tables since delta standalone library has to read all delta log json files and compute overall table snapshot. when you submit the job with Delta source two things are happening. 1. your main method is executed -> your all calling DeltaSourceBuilder which creates DeltaLog.instance (we are using delta log metadata to discover column types etc) 2. Flink is creating a source enumerator and this will also create a DeltaLog/Snapshot instance. Depending on how you are submitting your jobs, both things can take place on a Job Manager. Ideally would be to reuse same DeltaLog instance that was created during job submission by source builder in Source enumerator, but unfortunately DeltaLog is not a serializable object... and we cannot reuse it. currently it must be recreated...
Our watermark is computed using the event timestamp from data. If checkpoint is triggered before there is a watermark, the checkpoint will fail and cause the job to fail after couple of checkpoint failuresCould you clarify why "If checkpoint is triggered before there is a watermark, the checkpoint will fail" is that?
Kenny Ma
04/13/2023, 4:28 PMKrzysztof Chmielewski
04/13/2023, 7:26 PMKenny Ma
04/13/2023, 7:58 PMKrzysztof Chmielewski
04/14/2023, 9:45 AMI am noticing checkpoint will stall and timeout unless the task mgrs start consuming data.Well that seems possible since Job Manager is busy with processing DeltaLog in SourceEnumerator. However I would say that Job Manager has a separate thread to process checkpoint logic. Im assuming that checkpoint barrier should be send even if there is no data to process (task manager is waiting for split). SourceEnumerator is "outside" the data path. Splits are not transferred to readers on data path (from what I would expect). So having this, makes me thing that maybe you have to high load (not CPU usage but load), meaning there is so many things to do (many threads on job manager) that they are blocking CPU cores, making other threads to wait on CPU. Its regardless of the fact that Job Manager is busy with computing Delta snapshot -> it still be running on separate thread by flink. Is there any way for you to measure the CPU load on Task Manager node and increase the number of CPUs for that node?
Kenny Ma
04/14/2023, 3:28 PMs3.connection.maximum: 55
s3.threads.max: 50
also is this delta option/feature available in the Delte connector for Flink?
delta.enableFastS3AListFrom=true
Krzysztof Chmielewski
04/14/2023, 4:20 PM<repository>
<id>staging-repo</id>
<url> <https://oss.sonatype.org/content/repositories/iodelta-1066/></url>
</repository>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_${scala.main.version}</artifactId>
<version>${delta.version}</version>
<exclusions>
<exclusion>
<groupId>io.delta</groupId>
<artifactId>delta-storage</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-storage</artifactId>
<version>2.3.0rc1</version>
</dependency>
Scott Sandre (Delta Lake)
04/14/2023, 4:21 PMKrzysztof Chmielewski
04/14/2023, 4:21 PMKenny Ma
04/19/2023, 4:54 PMdelta.enableFastS3AListFrom=true
and didn’t find any noticeable improvement in the job submission/initialization time and it is still taking a long time (>10mins) with long lookback/time-travel. In the debug loggings of the job mgr, I noticed there are many calls to the S3, one for each parquet file. I don’t think it’s reading the data but do you know the reason for these calls? Most of the time spent during the job initialization is waiting for these calls to complete before the task mgrs start consuming data.
Is there plan to support predicate in the Delta connector? We have a large Delta Table and would like to be able to filter out files/parquets that are not needed by the consumer. Thanks!Krzysztof Chmielewski
04/19/2023, 4:55 PMScott Sandre (Delta Lake)
04/19/2023, 5:01 PMdidn’t find any noticeable improvement in the job submission/initialization time and it is still taking a long time (>10mins) with long lookback/time-travelthis LISTing improvement only improves LISTs. How many log files do you have in your delta table? My benchmark showed that 50_000 delta log files only took 30seconds with the normal list implementation. So, if you're seeing 10+mins initializaiton, that isn't from LISTing.
Is there plan to support predicate in the Delta connectorPlease make an issue at delta-io/connectors !
Krzysztof Chmielewski
04/19/2023, 5:04 PMScott Sandre (Delta Lake)
04/19/2023, 5:05 PMKrzysztof Chmielewski
04/19/2023, 5:06 PMKenny Ma
04/19/2023, 5:06 PMKrzysztof Chmielewski
04/19/2023, 5:08 PMhttp calls are for individual parquet files.If standalone is not using data parqeut files for snapshot initialziation than that must mean its for readers. Readers got Split from Source enumerator (split has path to parquet file) and they start to read parquet file.
Scott Sandre (Delta Lake)
04/19/2023, 5:09 PMis standalone using parquet files (data files not delta checkpoint) for snapshot initialziation or its based only on log files?Snapshot initialization does not load parquet (data) files. It just replays the log (JSON, checkpoint) to find the latest metadata and protocol
Kenny Ma
04/19/2023, 6:48 PM2023-04-19 18:32:07,485 DEBUG org.apache.http.headers [] - http-outgoing-79 << HTTP/1.1 200 OK
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers [] - http-outgoing-79 << Date: Wed, 19 Apr 2023 18:32:08 GMT
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers [] - http-outgoing-79 << Last-Modified: Wed, 19 Apr 2023 07:56:12 GMT
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers [] - http-outgoing-79 << Accept-Ranges: bytes
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers [] - http-outgoing-79 << Content-Type: application/octet-stream
2023-04-19 18:32:07,485 DEBUG org.apache.http.headers [] - http-outgoing-79 << Server: AmazonS3
2023-04-19 18:32:07,486 DEBUG org.apache.http.headers [] - http-outgoing-79 << Content-Length: 222051
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.execchain.MainClientExec [] - Connection can be kept alive for 60000 MILLISECONDS
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection [id: 79][route: {s}->https://{s3-bucket-redacted}:443] can be kept alive for 60.0 seconds
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-79: set socket timeout to 0
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection released: [id: 79][route: {s}->https://{s3-bucket-redacted}:443][total available: 1; route allocated: 1 of 96; total allocated: 1 of 96]
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection request: [route: {s}->https://{s3-bucket-redacted}:443][total available: 1; route allocated: 1 of 96; total allocated: 1 of 96]
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection leased: [id: 79][route: {s}->https://{s3-bucket-redacted}:443][total available: 0; route allocated: 1 of 96; total allocated: 1 of 96]
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-79: set socket timeout to 200000
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-79: set socket timeout to 200000
2023-04-19 18:32:07,486 DEBUG org.apache.http.impl.execchain.MainClientExec [] - Executing request HEAD /{delta-table-parquet-file-s3-prefix-redacted}/part-00078-c0f1f33b-03f9-4245-a08f-2c6bc31de7d3.c000.snappy.parquet HTTP/1.1