Skip to content

Commit 568b5d3

Browse files
authored
Merge branch 'main' into feature/kafka4
2 parents 4575788 + 032c9dd commit 568b5d3

File tree

18 files changed

+231
-32
lines changed

18 files changed

+231
-32
lines changed

.github/workflows/push_pr.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 2.0.0 ]
31+
flink: [ 2.1.0 ]
3232
jdk: [ '11, 17, 21' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
@@ -37,7 +37,7 @@ jobs:
3737
python_test:
3838
strategy:
3939
matrix:
40-
flink: [ 2.0.0 ]
40+
flink: [ 2.1.0 ]
4141
jdk: [ '11, 17, 21' ]
4242
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
4343
with:

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
scalastyle-output.xml
44
.classpath
55
.idea/
6+
.idea/*
7+
!.idea/vcs.xml
8+
.vscode
69
.metadata
710
.settings
811
.project

docs/content.zh/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确
3333

3434
Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
3535
该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
36-
当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
36+
当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。
3737
有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

docs/content/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading
3333

3434
Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client.
3535
The version of the client it uses may change between Flink releases.
36-
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
36+
Modern Kafka clients are backwards compatible with broker versions 2.1.0 or later.
3737
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

flink-connector-kafka/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/latest_offset_resume_topic*

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
2323
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
2424
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2525
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
26-
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
27-
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
2826
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
2927
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
3028
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)
@@ -34,6 +32,8 @@ Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(or
3432
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
3533
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154)
3634
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
35+
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
36+
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
3737
Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0)
3838
Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
3939
Method <org.apache.flink.connector.kafka.source.KafkaSource.getConfiguration()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
@@ -50,6 +50,4 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffs
5050
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
5151
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
5252
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
53-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408)
5453
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
55-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ public Set<KafkaStream> getAllStreams() {
7272
@Override
7373
public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
7474
try {
75-
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get().keySet()
76-
.stream()
75+
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get()
76+
.keySet().stream()
7777
.collect(Collectors.toMap(topic -> topic, this::createKafkaStream));
7878
} catch (InterruptedException | ExecutionException e) {
7979
throw new RuntimeException("Fetching all streams failed", e);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
import org.apache.kafka.clients.admin.AdminClient;
3434
import org.apache.kafka.clients.admin.KafkaAdminClient;
35-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
3635
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
3736
import org.apache.kafka.clients.admin.ListOffsetsResult;
3837
import org.apache.kafka.clients.admin.OffsetSpec;
@@ -534,15 +533,12 @@ public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) {
534533

535534
@Override
536535
public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
537-
ListConsumerGroupOffsetsSpec groupSpec =
538-
new ListConsumerGroupOffsetsSpec()
539-
.topicPartitions(new ArrayList<>(partitions));
540-
Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = Collections.singletonMap(groupId, groupSpec);
541-
ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions();
536+
ListConsumerGroupOffsetsSpec offsetsSpec =
537+
new ListConsumerGroupOffsetsSpec().topicPartitions(partitions);
542538
try {
543539
return adminClient
544-
.listConsumerGroupOffsets(groupSpecs, options)
545-
.all()
540+
.listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec))
541+
.partitionsToOffsetAndMetadata()
546542
.thenApply(
547543
result -> {
548544
Map<TopicPartition, Long> offsets = new HashMap<>();

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.flink.table.data.RowData;
4545
import org.apache.flink.table.types.DataType;
4646
import org.apache.flink.table.types.logical.LogicalType;
47-
import org.apache.flink.table.types.utils.DataTypeUtils;
4847

4948
import org.apache.kafka.clients.producer.ProducerConfig;
5049
import org.apache.kafka.common.header.Header;
@@ -405,7 +404,8 @@ private RowData.FieldGetter[] getFieldGetters(
405404
}
406405
DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType);
407406
if (prefix != null) {
408-
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
407+
physicalFormatDataType =
408+
TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
409409
}
410410
return format.createRuntimeEncoder(context, physicalFormatDataType);
411411
}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.flink.table.data.StringData;
5050
import org.apache.flink.table.data.TimestampData;
5151
import org.apache.flink.table.types.DataType;
52-
import org.apache.flink.table.types.utils.DataTypeUtils;
5352
import org.apache.flink.util.Preconditions;
5453

5554
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -571,7 +570,8 @@ private KafkaRecordDeserializationSchema<RowData> createKafkaDeserializationSche
571570
}
572571
DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType);
573572
if (prefix != null) {
574-
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
573+
physicalFormatDataType =
574+
TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
575575
}
576576
return format.createRuntimeDecoder(context, physicalFormatDataType);
577577
}

0 commit comments

Comments
 (0)