Skip to content

Commit ab5d615

Browse files
authored
[FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format
1 parent 7025642 commit ab5d615

33 files changed

+2978
-11
lines changed

flink-connector-jdbc/pom.xml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ under the License.
4949
</properties>
5050

5151
<dependencies>
52+
53+
<!-- Connectors -->
54+
55+
<dependency>
56+
<groupId>org.apache.flink</groupId>
57+
<artifactId>flink-connector-base</artifactId>
58+
<version>${flink.version}</version>
59+
<scope>provided</scope>
60+
</dependency>
61+
5262
<!-- Table ecosystem -->
5363

5464
<!-- Projects depending on this project won't depend on flink-table-*. -->
@@ -169,7 +179,22 @@ under the License.
169179
<scope>test</scope>
170180
</dependency>
171181

172-
<!-- Test dependencies -->
182+
<dependency>
183+
<groupId>org.apache.flink</groupId>
184+
<artifactId>flink-connector-base</artifactId>
185+
<version>${flink.version}</version>
186+
<scope>test</scope>
187+
<type>test-jar</type>
188+
</dependency>
189+
190+
<dependency>
191+
<groupId>org.apache.flink</groupId>
192+
<artifactId>flink-connector-test-utils</artifactId>
193+
<version>${flink.version}</version>
194+
<scope>test</scope>
195+
</dependency>
196+
197+
<!-- Assertions test dependencies -->
173198

174199
<dependency>
175200
<groupId>org.assertj</groupId>

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@
9999
* @see JdbcParameterValuesProvider
100100
* @see PreparedStatement
101101
* @see DriverManager
102+
* @deprecated Please use {@link org.apache.flink.connector.jdbc.source.JdbcSource} instead. The
103+
* builder utils and parameters passing could be view {@link
104+
* org.apache.flink.connector.jdbc.source.JdbcSourceBuilder}.
102105
*/
106+
@Deprecated
103107
@Experimental
104108
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
105109
implements ResultTypeQueryable<Row> {

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.flink.connector.jdbc.datasource.connections;
1919

20-
import org.apache.flink.annotation.Internal;
20+
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2222
import org.apache.flink.util.Preconditions;
2323

@@ -36,7 +36,7 @@
3636

3737
/** Simple JDBC connection provider. */
3838
@NotThreadSafe
39-
@Internal
39+
@PublicEvolving
4040
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
@@ -105,7 +105,7 @@ private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
105105

106106
@Override
107107
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
108-
if (connection != null) {
108+
if (isConnectionValid()) {
109109
return connection;
110110
}
111111
if (jdbcOptions.getDriverName() == null) {
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.source;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.annotation.VisibleForTesting;
23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.connector.source.Boundedness;
25+
import org.apache.flink.api.connector.source.Source;
26+
import org.apache.flink.api.connector.source.SourceReader;
27+
import org.apache.flink.api.connector.source.SourceReaderContext;
28+
import org.apache.flink.api.connector.source.SplitEnumerator;
29+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
30+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
31+
import org.apache.flink.configuration.Configuration;
32+
import org.apache.flink.connector.base.DeliveryGuarantee;
33+
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
34+
import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
35+
import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
36+
import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
37+
import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
38+
import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
39+
import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
40+
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
41+
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
42+
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
43+
import org.apache.flink.core.io.SimpleVersionedSerializer;
44+
import org.apache.flink.util.Preconditions;
45+
46+
import java.io.Serializable;
47+
import java.util.ArrayList;
48+
import java.util.Objects;
49+
50+
/** JDBC source. */
51+
@PublicEvolving
52+
public class JdbcSource<OUT>
53+
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
54+
ResultTypeQueryable<OUT> {
55+
56+
private final Boundedness boundedness;
57+
private final TypeInformation<OUT> typeInformation;
58+
59+
private final Configuration configuration;
60+
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
61+
62+
protected JdbcConnectionProvider connectionProvider;
63+
private final ResultExtractor<OUT> resultExtractor;
64+
private final DeliveryGuarantee deliveryGuarantee;
65+
66+
JdbcSource(
67+
Configuration configuration,
68+
JdbcConnectionProvider connectionProvider,
69+
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
70+
ResultExtractor<OUT> resultExtractor,
71+
TypeInformation<OUT> typeInformation,
72+
DeliveryGuarantee deliveryGuarantee) {
73+
this.configuration = Preconditions.checkNotNull(configuration);
74+
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
75+
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
76+
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
77+
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
78+
this.typeInformation = Preconditions.checkNotNull(typeInformation);
79+
this.boundedness = Boundedness.BOUNDED;
80+
}
81+
82+
JdbcSource(
83+
Configuration configuration,
84+
JdbcConnectionProvider connectionProvider,
85+
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
86+
ResultExtractor<OUT> resultExtractor,
87+
TypeInformation<OUT> typeInformation) {
88+
this(
89+
configuration,
90+
connectionProvider,
91+
sqlSplitEnumeratorProvider,
92+
resultExtractor,
93+
typeInformation,
94+
DeliveryGuarantee.NONE);
95+
}
96+
97+
@Override
98+
public Boundedness getBoundedness() {
99+
return boundedness;
100+
}
101+
102+
@Override
103+
public SourceReader<OUT, JdbcSourceSplit> createReader(SourceReaderContext readerContext)
104+
throws Exception {
105+
return new JdbcSourceReader<>(
106+
() ->
107+
new JdbcSourceSplitReader<>(
108+
readerContext,
109+
configuration,
110+
typeInformation,
111+
connectionProvider,
112+
deliveryGuarantee,
113+
resultExtractor),
114+
configuration,
115+
readerContext);
116+
}
117+
118+
@Override
119+
public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumerator(
120+
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
121+
return new JdbcSourceEnumerator(
122+
enumContext, sqlSplitEnumeratorProvider.create(), new ArrayList<>());
123+
}
124+
125+
@Override
126+
public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumerator(
127+
SplitEnumeratorContext<JdbcSourceSplit> enumContext,
128+
JdbcSourceEnumeratorState checkpoint)
129+
throws Exception {
130+
Serializable optionalUserDefinedSplitEnumeratorState =
131+
checkpoint.getOptionalUserDefinedSplitEnumeratorState();
132+
return new JdbcSourceEnumerator(
133+
enumContext,
134+
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
135+
checkpoint.getRemainingSplits());
136+
}
137+
138+
@Override
139+
public SimpleVersionedSerializer<JdbcSourceSplit> getSplitSerializer() {
140+
return new JdbcSourceSplitSerializer();
141+
}
142+
143+
@Override
144+
public SimpleVersionedSerializer<JdbcSourceEnumeratorState>
145+
getEnumeratorCheckpointSerializer() {
146+
return new JdbcSourceEnumStateSerializer((JdbcSourceSplitSerializer) getSplitSerializer());
147+
}
148+
149+
@Override
150+
public TypeInformation<OUT> getProducedType() {
151+
return typeInformation;
152+
}
153+
154+
public static <OUT> JdbcSourceBuilder<OUT> builder() {
155+
return new JdbcSourceBuilder<>();
156+
}
157+
158+
// ---- Visible for testing methods. ---
159+
160+
@VisibleForTesting
161+
public JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> getSqlSplitEnumeratorProvider() {
162+
return sqlSplitEnumeratorProvider;
163+
}
164+
165+
@VisibleForTesting
166+
public TypeInformation<OUT> getTypeInformation() {
167+
return typeInformation;
168+
}
169+
170+
@VisibleForTesting
171+
public Configuration getConfiguration() {
172+
return configuration;
173+
}
174+
175+
@VisibleForTesting
176+
public ResultExtractor<OUT> getResultExtractor() {
177+
return resultExtractor;
178+
}
179+
180+
@VisibleForTesting
181+
@Override
182+
public boolean equals(Object o) {
183+
if (this == o) {
184+
return true;
185+
}
186+
if (o == null || getClass() != o.getClass()) {
187+
return false;
188+
}
189+
JdbcSource<?> that = (JdbcSource<?>) o;
190+
return boundedness == that.boundedness
191+
&& Objects.equals(typeInformation, that.typeInformation)
192+
&& Objects.equals(configuration, that.configuration)
193+
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
194+
&& Objects.equals(connectionProvider, that.connectionProvider)
195+
&& Objects.equals(resultExtractor, that.resultExtractor)
196+
&& deliveryGuarantee == that.deliveryGuarantee;
197+
}
198+
}

0 commit comments

Comments
 (0)