Skip to content

Commit 49c878f

Browse files
authored
Merge branch 'apache:main' into feature/kafka4
2 parents 568b5d3 + cb5c5c0 commit 49c878f

File tree

15 files changed

+718
-324
lines changed

15 files changed

+718
-324
lines changed

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,14 @@ under the License.
175175
<scope>test</scope>
176176
</dependency>
177177

178-
<dependency>
178+
<dependency>
179+
<groupId>org.apache.flink</groupId>
180+
<artifactId>flink-sql-connector-kafka</artifactId>
181+
<version>${project.version}</version>
182+
<scope>test</scope>
183+
</dependency>
184+
185+
<dependency>
179186
<groupId>org.apache.flink</groupId>
180187
<artifactId>flink-connector-test-utils</artifactId>
181188
<exclusions>
@@ -255,7 +262,7 @@ under the License.
255262
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
256263
</artifactItem>
257264
</artifactItems>
258-
<ignoredUnusedDeclaredDependencies>org.apache.flink:flink-streaming-kafka-test,org.apache.flink:flink-sql-avro,org.apache.flink:flink-sql-avro-confluent-registry,org.apache.flink:flink-connector-base
265+
<ignoredUnusedDeclaredDependencies>org.apache.flink:flink-streaming-kafka-test,org.apache.flink:flink-sql-avro,org.apache.flink:flink-sql-avro-confluent-registry,org.apache.flink:flink-connector-base,org.apache.flink:flink-sql-connector-kafka
259266
</ignoredUnusedDeclaredDependencies>
260267
</configuration>
261268
</plugin>

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
2323
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
2424
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2525
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
26-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
27-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
28-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)
29-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:180)
30-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:177)
26+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:183)
27+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:186)
28+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:182)
29+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:185)
30+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:182)
3131
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
3232
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
3333
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154)
@@ -39,9 +39,12 @@ Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apa
3939
Method <org.apache.flink.connector.kafka.source.KafkaSource.getConfiguration()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
4040
Method <org.apache.flink.connector.kafka.source.KafkaSource.getKafkaSubscriber()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
4141
Method <org.apache.flink.connector.kafka.source.KafkaSource.getStoppingOffsetsInitializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
42-
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeTopicPartitions(java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
42+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV1(java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
43+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV2(java.util.Collection, boolean)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
44+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV3(org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
4345
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.deepCopyProperties(java.util.Properties, java.util.Properties)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
4446
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getPartitionChange(java.util.Set)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
47+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getPendingPartitionSplitAssignment()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
4548
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSplitOwner(org.apache.kafka.common.TopicPartition, int)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
4649
Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.consumer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaPartitionSplitReader.java:0)
4750
Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.setConsumerClientRack(java.util.Properties, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaPartitionSplitReader.java:0)

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@
3535
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
3636
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
3737
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
38-
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
38+
import org.apache.flink.connector.kafka.source.enumerator.SplitAndAssignmentStatus;
3939
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
4040
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
4141
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
4242
import org.apache.flink.util.Preconditions;
4343

4444
import org.apache.kafka.common.KafkaException;
45-
import org.apache.kafka.common.TopicPartition;
4645
import org.slf4j.Logger;
4746
import org.slf4j.LoggerFactory;
4847

@@ -174,8 +173,8 @@ public DynamicKafkaSourceEnumerator(
174173
dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
175174
this.latestClusterTopicsMap.put(
176175
clusterEnumState.getKey(),
177-
clusterEnumState.getValue().assignedPartitions().stream()
178-
.map(TopicPartition::topic)
176+
clusterEnumState.getValue().assignedSplits().stream()
177+
.map(KafkaPartitionSplit::getTopic)
179178
.collect(Collectors.toSet()));
180179

181180
createEnumeratorWithAssignedTopicPartitions(
@@ -291,9 +290,9 @@ private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams
291290
final Set<String> activeTopics = activeClusterTopics.getValue();
292291

293292
// filter out removed topics
294-
Set<TopicPartitionAndAssignmentStatus> partitions =
295-
kafkaSourceEnumState.partitions().stream()
296-
.filter(tp -> activeTopics.contains(tp.topicPartition().topic()))
293+
Set<SplitAndAssignmentStatus> partitions =
294+
kafkaSourceEnumState.splits().stream()
295+
.filter(tp -> activeTopics.contains(tp.split().getTopic()))
297296
.collect(Collectors.toSet());
298297

299298
newKafkaSourceEnumState =

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
89
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1011
*
1112
* Unless required by applicable law or agreed to in writing, software
1213
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,6 +27,7 @@
2627
import org.apache.flink.util.IOUtils;
2728

2829
import org.apache.kafka.clients.producer.ProducerConfig;
30+
import org.apache.kafka.common.errors.InterruptException;
2931
import org.apache.kafka.common.errors.InvalidTxnStateException;
3032
import org.apache.kafka.common.errors.ProducerFencedException;
3133
import org.apache.kafka.common.errors.RetriableException;
@@ -125,6 +127,17 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
125127
e);
126128
handleFailedTransaction(producer);
127129
request.signalFailedWithKnownReason(e);
130+
} catch (InterruptException e) {
131+
// note that we do not attempt to recover from this exception; producer is likely
132+
// left in an inconsistent state
133+
LOG.info(
134+
"Committing transaction ({}) was interrupted. This most likely happens because the task is being cancelled.",
135+
request,
136+
e);
137+
// reset the interrupt flag that is set when InterruptException is created
138+
Thread.interrupted();
139+
// propagate interruption through java.lang.InterruptedException instead
140+
throw new InterruptedException(e.getMessage());
128141
} catch (Exception e) {
129142
LOG.error(
130143
"Transaction ({}) encountered error and data has been potentially lost.",

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@ public enum AssignmentStatus {
2626

2727
/** Partitions that have been assigned to readers. */
2828
ASSIGNED(0),
29-
/**
30-
* The partitions that have been discovered during initialization but not assigned to readers
31-
* yet.
32-
*/
33-
UNASSIGNED_INITIAL(1);
29+
/** The partitions that have been discovered but not assigned to readers yet. */
30+
UNASSIGNED(1);
3431
private final int statusCode;
3532

3633
AssignmentStatus(int statusCode) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,76 +19,73 @@
1919
package org.apache.flink.connector.kafka.source.enumerator;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
2223

23-
import org.apache.kafka.common.TopicPartition;
24-
24+
import java.util.Collection;
2525
import java.util.HashSet;
2626
import java.util.Set;
2727
import java.util.stream.Collectors;
2828

2929
/** The state of Kafka source enumerator. */
3030
@Internal
3131
public class KafkaSourceEnumState {
32-
/** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
33-
private final Set<TopicPartitionAndAssignmentStatus> partitions;
32+
/** Splits with status: ASSIGNED or UNASSIGNED_INITIAL. */
33+
private final Set<SplitAndAssignmentStatus> splits;
3434
/**
3535
* this flag will be marked as true if initial partitions are discovered after enumerator
3636
* starts.
3737
*/
3838
private final boolean initialDiscoveryFinished;
3939

4040
public KafkaSourceEnumState(
41-
Set<TopicPartitionAndAssignmentStatus> partitions, boolean initialDiscoveryFinished) {
42-
this.partitions = partitions;
41+
Set<SplitAndAssignmentStatus> splits, boolean initialDiscoveryFinished) {
42+
this.splits = splits;
4343
this.initialDiscoveryFinished = initialDiscoveryFinished;
4444
}
4545

4646
public KafkaSourceEnumState(
47-
Set<TopicPartition> assignPartitions,
48-
Set<TopicPartition> unassignedInitialPartitions,
47+
Collection<KafkaPartitionSplit> assignedSplits,
48+
Collection<KafkaPartitionSplit> unassignedSplits,
4949
boolean initialDiscoveryFinished) {
50-
this.partitions = new HashSet<>();
51-
partitions.addAll(
52-
assignPartitions.stream()
50+
this.splits = new HashSet<>();
51+
splits.addAll(
52+
assignedSplits.stream()
5353
.map(
5454
topicPartition ->
55-
new TopicPartitionAndAssignmentStatus(
55+
new SplitAndAssignmentStatus(
5656
topicPartition, AssignmentStatus.ASSIGNED))
5757
.collect(Collectors.toSet()));
58-
partitions.addAll(
59-
unassignedInitialPartitions.stream()
58+
splits.addAll(
59+
unassignedSplits.stream()
6060
.map(
6161
topicPartition ->
62-
new TopicPartitionAndAssignmentStatus(
63-
topicPartition,
64-
AssignmentStatus.UNASSIGNED_INITIAL))
62+
new SplitAndAssignmentStatus(
63+
topicPartition, AssignmentStatus.UNASSIGNED))
6564
.collect(Collectors.toSet()));
6665
this.initialDiscoveryFinished = initialDiscoveryFinished;
6766
}
6867

69-
public Set<TopicPartitionAndAssignmentStatus> partitions() {
70-
return partitions;
68+
public Set<SplitAndAssignmentStatus> splits() {
69+
return splits;
7170
}
7271

73-
public Set<TopicPartition> assignedPartitions() {
74-
return filterPartitionsByAssignmentStatus(AssignmentStatus.ASSIGNED);
72+
public Collection<KafkaPartitionSplit> assignedSplits() {
73+
return filterByAssignmentStatus(AssignmentStatus.ASSIGNED);
7574
}
7675

77-
public Set<TopicPartition> unassignedInitialPartitions() {
78-
return filterPartitionsByAssignmentStatus(AssignmentStatus.UNASSIGNED_INITIAL);
76+
public Collection<KafkaPartitionSplit> unassignedSplits() {
77+
return filterByAssignmentStatus(AssignmentStatus.UNASSIGNED);
7978
}
8079

8180
public boolean initialDiscoveryFinished() {
8281
return initialDiscoveryFinished;
8382
}
8483

85-
private Set<TopicPartition> filterPartitionsByAssignmentStatus(
84+
private Collection<KafkaPartitionSplit> filterByAssignmentStatus(
8685
AssignmentStatus assignmentStatus) {
87-
return partitions.stream()
88-
.filter(
89-
partitionWithStatus ->
90-
partitionWithStatus.assignmentStatus().equals(assignmentStatus))
91-
.map(TopicPartitionAndAssignmentStatus::topicPartition)
92-
.collect(Collectors.toSet());
86+
return splits.stream()
87+
.filter(split -> split.assignmentStatus().equals(assignmentStatus))
88+
.map(SplitAndAssignmentStatus::split)
89+
.collect(Collectors.toList());
9390
}
9491
}

0 commit comments

Comments
 (0)