Adrian Smith
01/10/2023, 8:18 AM0.6.0
along with Flink 1.15.3
and things run super smoothly, thanks for the release!
I have however noticed metric name collisions from the connector:
Name collision: Group already contains a Metric with the name 'DeltaSinkBytesWritten'. Metric will not be reported.[my_application_name, flink, operator]
A colleague of mine sent me this thread https://lists.apache.org/thread/xl02b08n823ct9lsnhosv2x2zlskm0cs, indicating that To prevent this warning from occurring you will have to give every operator a unique name.
.
The Flink application I have set up is quite simple adheres to the above:
val source = streamExecutionEnvironment
.fromSource(
KafkaRecordSource.create(configuration.sourceConfiguration, configuration.kafkaConfiguration),
WatermarkStrategy.forMonotonousTimestamps(),
configuration.sourceConfiguration.kafkaTopic
)
.flatMap(KafkaConsumerRecordRowMapper())
.name("Kafka Consumer Record Row Mapper")
...
source.sinkTo(sink).name("Delta Table Sink")
...
streamExecutionEnvironment.execute("[application name] Kafka to Delta Lake")
A note is that I have prefixed the operator metrics scope:
...
metrics.scope.operator: my_application_name.flink.operator
The source thread that the log comes from is Source: [redacted-kafka-topic-name] -> Kafka Consumer Record Row Mapper -> Delta Table Sink: Writer -> Delta Table Sink: Committer (3/3)#0
indicating that it’s not the Global Committer (to my understanding). The Kafka topic has three partitions, hence the source having three subtasks in the operator on Flink (see screenshot). Is this a bug or is there something I may have missed? 🙂Neha
03/09/2023, 8:21 AMNeha
04/06/2023, 1:19 PMAndré Midea Jasiskis
08/16/2023, 10:18 AM<!-- Needed for Flink Table/SQL API -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.main.version}</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<!-- End needed for Flink Table/SQL API -->
<!-- Needed for AWS S3 support -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-version}</version>
</dependency>
<!-- End needed for AWS S3 support -->
Sairam Yeturi
08/17/2023, 9:51 AMAditya Goyal
08/22/2023, 1:40 PM