Skip to content

Commit 84e20b0

Browse files
Merge branch 'main' into jdbc_spanner_connector
# Conflicts: # flink-connector-jdbc/pom.xml
2 parents 3c7d225 + f697986 commit 84e20b0

File tree

88 files changed

+203
-5867
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+203
-5867
lines changed

.github/workflows/backwards_compatibility.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ jobs:
2929
runs-on: ubuntu-latest
3030
strategy:
3131
matrix:
32-
flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
33-
jdk: [8, 11, 17]
32+
flink: [2.0-SNAPSHOT, 2.1-SNAPSHOT]
33+
jdk: [17]
3434

3535
env:
3636
MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120

.github/workflows/push_pr.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [1.20.0]
32-
jdk: [ '8, 11, 17, 21' ]
31+
flink: [2.0.0]
32+
jdk: [ '17' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
3535
flink_version: ${{ matrix.flink }}

.github/workflows/weekly.yml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.19-SNAPSHOT,
34-
jdk: '8, 11, 17, 21',
33+
flink: 2.1-SNAPSHOT,
34+
jdk: '17',
3535
branch: main
36-
},
37-
{
38-
flink: 1.20-SNAPSHOT,
39-
jdk: '8, 11, 17, 21',
36+
}, {
37+
flink: 2.0-SNAPSHOT,
38+
jdk: '17',
39+
branch: main
40+
}, {
41+
flink: 2.0.0,
42+
jdk: '17',
4043
branch: main
4144
}, {
4245
flink: 1.19.1,

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "tools/releasing/shared"]
22
path = tools/releasing/shared
3-
url = https://github.com/apache/flink-connector-shared-utils
4-
branch = release_utils
3+
url = git@github.com:apache/flink-connector-shared-utils.git
4+
branch = release_utils

flink-connector-jdbc-architecture/pom.xml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>org.apache.flink</groupId>
88
<artifactId>flink-connector-jdbc-parent</artifactId>
9-
<version>3.3-SNAPSHOT</version>
9+
<version>4.0-SNAPSHOT</version>
1010
</parent>
1111

1212
<artifactId>flink-connector-jdbc-architecture</artifactId>
@@ -36,12 +36,6 @@
3636
</dependency>
3737

3838
<!-- Flink Jdbc Modules To Test -->
39-
<dependency>
40-
<groupId>org.apache.flink</groupId>
41-
<artifactId>flink-connector-jdbc</artifactId>
42-
<version>${project.version}</version>
43-
<scope>test</scope>
44-
</dependency>
4539
<dependency>
4640
<groupId>org.apache.flink</groupId>
4741
<artifactId>flink-connector-jdbc-core</artifactId>

flink-connector-jdbc-backward-compatibility/pom.xml

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.apache.flink</groupId>
99
<artifactId>flink-connector-jdbc-parent</artifactId>
10-
<version>3.3-SNAPSHOT</version>
10+
<version>4.0-SNAPSHOT</version>
1111
</parent>
1212

1313
<groupId>org.apache.flink</groupId>
@@ -17,6 +17,7 @@
1717
<packaging>jar</packaging>
1818

1919
<properties>
20+
<postgres.version>42.7.3</postgres.version>
2021
<surefire.module.config>
2122
--add-opens=java.base/java.util=ALL-UNNAMED
2223
--add-opens=java.base/java.lang=ALL-UNNAMED
@@ -58,17 +59,30 @@
5859
</dependency>
5960
<dependency>
6061
<groupId>org.apache.flink</groupId>
61-
<artifactId>flink-connector-jdbc</artifactId>
62+
<artifactId>flink-connector-jdbc-core</artifactId>
63+
<version>${project.version}</version>
64+
<type>test-jar</type>
65+
<scope>test</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.apache.flink</groupId>
69+
<artifactId>flink-connector-jdbc-postgres</artifactId>
6270
<version>${project.version}</version>
6371
<scope>test</scope>
6472
</dependency>
6573
<dependency>
6674
<groupId>org.apache.flink</groupId>
67-
<artifactId>flink-connector-jdbc</artifactId>
75+
<artifactId>flink-connector-jdbc-postgres</artifactId>
6876
<version>${project.version}</version>
6977
<type>test-jar</type>
7078
<scope>test</scope>
7179
</dependency>
80+
<dependency>
81+
<groupId>org.postgresql</groupId>
82+
<artifactId>postgresql</artifactId>
83+
<version>${postgres.version}</version>
84+
<scope>test</scope>
85+
</dependency>
7286
<dependency>
7387
<groupId>org.testcontainers</groupId>
7488
<artifactId>postgresql</artifactId>

flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818

1919
package org.apache.flink.connector.jdbc.backward.compatibility;
2020

21-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2221
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2322
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
2423
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
25-
import org.apache.flink.connector.jdbc.JdbcSink;
24+
import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
2625
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
2726
import org.apache.flink.connector.jdbc.testutils.TableManaged;
2827
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
2928
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3029
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
30+
import org.apache.flink.streaming.util.RestartStrategyUtils;
3131
import org.apache.flink.test.junit5.MiniClusterExtension;
3232

3333
import org.junit.jupiter.api.Test;
@@ -72,7 +72,7 @@ public List<TableManaged> getManagedTables() {
7272
@Test
7373
public void testAtLeastOnce() throws Exception {
7474
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
75-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
75+
RestartStrategyUtils.configureNoRestartStrategy(env);
7676
env.setParallelism(1);
7777

7878
assertResult(new ArrayList<>());
@@ -98,7 +98,7 @@ public void testAtLeastOnce() throws Exception {
9898
@Test
9999
public void testExactlyOnce() throws Exception {
100100
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
101-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
101+
RestartStrategyUtils.configureNoRestartStrategy(env);
102102
env.setParallelism(1);
103103

104104
assertResult(new ArrayList<>());

flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@
1919
package org.apache.flink.connector.jdbc.backward.compatibility;
2020

2121
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
22-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2322
import org.apache.flink.api.common.typeinfo.TypeInformation;
2423
import org.apache.flink.connector.jdbc.JdbcTestFixture;
24+
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
25+
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
2526
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
26-
import org.apache.flink.connector.jdbc.source.JdbcSource;
27-
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
2827
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2928
import org.apache.flink.connector.jdbc.testutils.TableManaged;
3029
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
3130
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3231
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
32+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
33+
import org.apache.flink.streaming.util.RestartStrategyUtils;
3434
import org.apache.flink.test.junit5.MiniClusterExtension;
3535

3636
import org.junit.jupiter.api.BeforeEach;
@@ -101,7 +101,7 @@ void init() throws SQLException {
101101
@Test
102102
void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
103103
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
104-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
104+
RestartStrategyUtils.configureNoRestartStrategy(env);
105105
env.setParallelism(1);
106106
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
107107
JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -122,7 +122,7 @@ void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
122122
@Test
123123
void testReadWithoutParallelismWithParamsProvider() throws Exception {
124124
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
125-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
125+
RestartStrategyUtils.configureNoRestartStrategy(env);
126126
env.setParallelism(1);
127127
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
128128
JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -146,7 +146,7 @@ void testReadWithoutParallelismWithParamsProvider() throws Exception {
146146
@Test
147147
void testReadWithParallelismWithoutParamsProvider() throws Exception {
148148
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
149-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
149+
RestartStrategyUtils.configureNoRestartStrategy(env);
150150
env.setParallelism(2);
151151
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
152152
JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -167,7 +167,7 @@ void testReadWithParallelismWithoutParamsProvider() throws Exception {
167167
@Test
168168
void testReadWithParallelismWithParamsProvider() throws Exception {
169169
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
170-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
170+
RestartStrategyUtils.configureNoRestartStrategy(env);
171171
env.setParallelism(2);
172172
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
173173
JdbcSource.<JdbcTestFixture.TestEntry>builder()

flink-connector-jdbc-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ under the License.
2626
<parent>
2727
<groupId>org.apache.flink</groupId>
2828
<artifactId>flink-connector-jdbc-parent</artifactId>
29-
<version>3.3-SNAPSHOT</version>
29+
<version>4.0-SNAPSHOT</version>
3030
</parent>
3131

3232
<artifactId>flink-connector-jdbc-core</artifactId>

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,10 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
298298
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
299299
Schema tableSchema = schemaBuilder.build();
300300

301-
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath));
301+
return CatalogTable.newBuilder()
302+
.schema(tableSchema)
303+
.options(getOptions(tablePath))
304+
.build();
302305
} catch (Exception e) {
303306
throw new CatalogException(
304307
String.format("Failed getting table %s", tablePath.getFullName()), e);

0 commit comments

Comments
 (0)