Skip to content

Commit 63a2301

Browse files
committed
[FLINK-38733] Add new SplitterEnumerator on JdbcSource
1 parent ea84be6 commit 63a2301

21 files changed

+978
-67
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
3636
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
3737
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
38-
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
38+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator;
39+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
3940
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
4041
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
4142
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
@@ -71,12 +72,13 @@ public class JdbcSource<OUT>
7172
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
7273

7374
private final Configuration configuration;
74-
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
75+
private final SplitterEnumerator splitterEnumerator;
7576

7677
protected JdbcConnectionProvider connectionProvider;
7778
private final ResultExtractor<OUT> resultExtractor;
7879
private final DeliveryGuarantee deliveryGuarantee;
7980

81+
@Deprecated
8082
JdbcSource(
8183
Configuration configuration,
8284
JdbcConnectionProvider connectionProvider,
@@ -85,9 +87,27 @@ public class JdbcSource<OUT>
8587
TypeInformation<OUT> typeInformation,
8688
@Nullable DeliveryGuarantee deliveryGuarantee,
8789
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
90+
this(
91+
configuration,
92+
connectionProvider,
93+
new JdbcSqlSplitterEnumerator(sqlSplitEnumeratorProvider),
94+
resultExtractor,
95+
typeInformation,
96+
deliveryGuarantee,
97+
continuousUnBoundingSettings);
98+
}
99+
100+
JdbcSource(
101+
Configuration configuration,
102+
JdbcConnectionProvider connectionProvider,
103+
SplitterEnumerator splitterEnumerator,
104+
ResultExtractor<OUT> resultExtractor,
105+
TypeInformation<OUT> typeInformation,
106+
@Nullable DeliveryGuarantee deliveryGuarantee,
107+
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
88108
this.configuration = Preconditions.checkNotNull(configuration);
89109
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
90-
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
110+
this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator);
91111
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
92112
this.deliveryGuarantee =
93113
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
@@ -125,7 +145,8 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumera
125145
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
126146
return new JdbcSourceEnumerator(
127147
enumContext,
128-
sqlSplitEnumeratorProvider.create(),
148+
splitterEnumerator,
149+
connectionProvider,
129150
continuousUnBoundingSettings,
130151
new ArrayList<>());
131152
}
@@ -139,7 +160,8 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
139160
checkpoint.getOptionalUserDefinedSplitEnumeratorState();
140161
return new JdbcSourceEnumerator(
141162
enumContext,
142-
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
163+
splitterEnumerator.restoreState(optionalUserDefinedSplitEnumeratorState),
164+
connectionProvider,
143165
continuousUnBoundingSettings,
144166
checkpoint.getRemainingSplits());
145167
}
@@ -167,8 +189,8 @@ public static <OUT> JdbcSourceBuilder<OUT> builder() {
167189
// ---- Visible for testing methods. ---
168190

169191
@VisibleForTesting
170-
public JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> getSqlSplitEnumeratorProvider() {
171-
return sqlSplitEnumeratorProvider;
192+
public SplitterEnumerator getSplitterEnumerator() {
193+
return splitterEnumerator;
172194
}
173195

174196
@VisibleForTesting
@@ -204,7 +226,7 @@ public boolean equals(Object o) {
204226
return boundedness == that.boundedness
205227
&& Objects.equals(typeInformation, that.typeInformation)
206228
&& Objects.equals(configuration, that.configuration)
207-
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
229+
&& Objects.equals(splitterEnumerator, that.splitterEnumerator)
208230
&& Objects.equals(connectionProvider, that.connectionProvider)
209231
&& Objects.equals(resultExtractor, that.resultExtractor)
210232
&& deliveryGuarantee == that.deliveryGuarantee
@@ -215,9 +237,8 @@ public boolean equals(Object o) {
215237
public LineageVertex getLineageVertex() {
216238
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
217239
new DefaultTypeDatasetFacet(getTypeInformation());
218-
SqlTemplateSplitEnumerator enumerator =
219-
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
220-
Optional<String> nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true);
240+
Optional<String> nameOpt =
241+
LineageUtils.tableNameOf(splitterEnumerator.lineageQueries(), true);
221242
String namespace = LineageUtils.namespaceOf(connectionProvider);
222243
LineageDataset dataset =
223244
LineageUtils.datasetOf(

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import org.apache.flink.connector.base.DeliveryGuarantee;
2525
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2626
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
27+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
2728
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
29+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.JdbcSqlSplitterEnumerator;
30+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
2831
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
2932
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3033
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
@@ -51,7 +54,7 @@
5154
*
5255
* <pre><code>
5356
* JdbcSource&lt;Row> source = JdbcSource.&lt;Row>builder()
54-
* .setSql(validSql)
57+
* .setSplitter(PreparedSplitterEnumerator.of(validSql))
5558
* .setResultExtractor(new RowResultExtractor())
5659
* .setDBUrl(dbUrl)
5760
* .setDriverName(driverName)
@@ -67,6 +70,7 @@
6770
*
6871
* <pre><code>
6972
*
73+
* String query = "select * from books WHERE author = ?"
7074
* Serializable[][] queryParameters = new String[2][1];
7175
* queryParameters[0] = new String[]{"Kumar"};
7276
* queryParameters[1] = new String[]{"Tan Ah Teck"};
@@ -78,13 +82,12 @@
7882
* .setUsername(username)
7983
* .setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
8084
* .setDBUrl("jdbc:derby:memory:ebookshop")
81-
* .setSql("select * from books WHERE author = ?")
82-
* .setJdbcParameterValuesProvider(new JdbcGenericParameterValuesProvider(queryParameters))
85+
* .setSplitter(PreparedSplitterEnumerator.of(query, queryParameters))
8386
* .build();
8487
* </code></pre>
8588
*
8689
* @see Row
87-
* @see JdbcParameterValuesProvider
90+
* @see SplitterEnumerator
8891
* @see PreparedStatement
8992
* @see DriverManager
9093
* @see JdbcSource
@@ -118,7 +121,7 @@ public class JdbcSourceBuilder<OUT> {
118121
private JdbcParameterValuesProvider jdbcParameterValuesProvider;
119122
private @Nullable Serializable optionalSqlSplitEnumeratorState;
120123
private ResultExtractor<OUT> resultExtractor;
121-
124+
private SplitterEnumerator splitterEnumerator;
122125
private JdbcConnectionProvider connectionProvider;
123126

124127
JdbcSourceBuilder() {
@@ -132,6 +135,12 @@ public class JdbcSourceBuilder<OUT> {
132135
this.autoCommit = true;
133136
}
134137

138+
public JdbcSourceBuilder<OUT> setSplitter(SplitterEnumerator splitterEnumerator) {
139+
this.splitterEnumerator = splitterEnumerator;
140+
return this;
141+
}
142+
143+
@Deprecated
135144
public JdbcSourceBuilder<OUT> setSql(@Nonnull String sql) {
136145
Preconditions.checkArgument(
137146
!StringUtils.isNullOrWhitespaceOnly(sql),
@@ -197,6 +206,7 @@ public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings(
197206
* If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's
198207
* required to specify the {@link #continuousUnBoundingSettings}.
199208
*/
209+
@Deprecated
200210
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
201211
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
202212
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);
@@ -289,36 +299,52 @@ public JdbcSource<OUT> build() {
289299
JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
290300
this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);
291301

292-
Preconditions.checkState(
293-
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
294302
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
295303
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");
296304

297-
if (Objects.nonNull(continuousUnBoundingSettings)) {
298-
Preconditions.checkArgument(
299-
Objects.nonNull(jdbcParameterValuesProvider)
300-
&& jdbcParameterValuesProvider
301-
instanceof JdbcSlideTimingParameterProvider,
302-
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
303-
}
304-
305-
if (Objects.nonNull(jdbcParameterValuesProvider)
306-
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
307-
Preconditions.checkArgument(
308-
Objects.nonNull(continuousUnBoundingSettings),
309-
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
305+
if (this.splitterEnumerator == null) {
306+
Preconditions.checkState(
307+
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
308+
309+
if (Objects.nonNull(continuousUnBoundingSettings)) {
310+
Preconditions.checkArgument(
311+
Objects.nonNull(jdbcParameterValuesProvider)
312+
&& jdbcParameterValuesProvider
313+
instanceof JdbcSlideTimingParameterProvider,
314+
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
315+
}
316+
317+
if (Objects.nonNull(jdbcParameterValuesProvider)
318+
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
319+
Preconditions.checkArgument(
320+
Objects.nonNull(continuousUnBoundingSettings),
321+
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
322+
}
323+
324+
this.splitterEnumerator =
325+
getSplitter(sql, jdbcParameterValuesProvider, optionalSqlSplitEnumeratorState);
310326
}
311327

312328
return new JdbcSource<>(
313329
configuration,
314330
connectionProvider,
315-
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
316-
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
317-
.setSqlTemplate(sql)
318-
.setParameterValuesProvider(jdbcParameterValuesProvider),
331+
splitterEnumerator,
319332
resultExtractor,
320333
typeInformation,
321334
deliveryGuarantee,
322335
continuousUnBoundingSettings);
323336
}
337+
338+
private SplitterEnumerator getSplitter(
339+
String sqlTemplate,
340+
JdbcParameterValuesProvider parameterProvider,
341+
Serializable userDefinedState) {
342+
JdbcSqlSplitEnumeratorBase.Provider<?> provider =
343+
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
344+
.setOptionalSqlSplitEnumeratorState(userDefinedState)
345+
.setSqlTemplate(sqlTemplate)
346+
.setParameterValuesProvider(parameterProvider);
347+
348+
return new JdbcSqlSplitterEnumerator(provider);
349+
}
324350
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.flink.api.connector.source.SplitEnumerator;
2424
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2525
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
26+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
2627
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
28+
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
2729
import org.apache.flink.util.Preconditions;
2830

2931
import org.slf4j.Logger;
@@ -50,47 +52,52 @@ public class JdbcSourceEnumerator
5052
private final Boundedness boundedness;
5153
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
5254
private final List<JdbcSourceSplit> unassigned;
53-
private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator;
55+
private final SplitterEnumerator splitterEnumerator;
5456
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
57+
private final JdbcConnectionProvider connectionProvider;
5558

5659
public JdbcSourceEnumerator(
5760
SplitEnumeratorContext<JdbcSourceSplit> context,
58-
JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator,
61+
SplitterEnumerator splitterEnumerator,
62+
JdbcConnectionProvider connectionProvider,
5963
ContinuousUnBoundingSettings continuousUnBoundingSettings,
6064
List<JdbcSourceSplit> unassigned) {
6165
this.context = Preconditions.checkNotNull(context);
62-
this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator);
66+
this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator);
6367
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
6468
this.boundedness =
6569
Objects.isNull(continuousUnBoundingSettings)
6670
? Boundedness.BOUNDED
6771
: Boundedness.CONTINUOUS_UNBOUNDED;
6872
this.unassigned = Preconditions.checkNotNull(unassigned);
6973
this.readersAwaitingSplit = new LinkedHashMap<>();
74+
this.connectionProvider = connectionProvider;
7075
}
7176

7277
@Override
7378
public void start() {
74-
sqlSplitEnumerator.open();
79+
splitterEnumerator.start(connectionProvider);
7580
if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED
7681
&& Objects.nonNull(continuousUnBoundingSettings)) {
7782
context.callAsync(
78-
() -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
83+
() -> splitterEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
7984
this::processNewSplits,
8085
continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(),
8186
continuousUnBoundingSettings.getDiscoveryInterval().toMillis());
8287
} else {
83-
try {
84-
unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true));
85-
} catch (IOException e) {
86-
throw new RuntimeException(e);
87-
}
88+
context.callAsync(
89+
() ->
90+
splitterEnumerator.isAllSplitsFinished()
91+
? Collections.emptyList()
92+
: splitterEnumerator.enumerateSplits(),
93+
(List<JdbcSourceSplit> splits, Throwable error) ->
94+
this.unassigned.addAll(splits));
8895
}
8996
}
9097

9198
@Override
9299
public void close() throws IOException {
93-
sqlSplitEnumerator.close();
100+
splitterEnumerator.close();
94101
}
95102

96103
@Override
@@ -136,7 +143,7 @@ public JdbcSourceEnumeratorState snapshotState(long checkpointId) throws Excepti
136143
Collections.emptyList(),
137144
Collections.emptyList(),
138145
new ArrayList<>(unassigned),
139-
sqlSplitEnumerator.optionalSqlSplitEnumeratorState);
146+
splitterEnumerator.serializableState());
140147
}
141148

142149
private Optional<JdbcSourceSplit> getNextSplit() {

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*
3535
* @param <SplitT> JDBC split type.
3636
*/
37+
@Deprecated
3738
@PublicEvolving
3839
public abstract class JdbcSqlSplitEnumeratorBase<SplitT> implements AutoCloseable, Serializable {
3940
private final char[] currentId = "0000000000".toCharArray();
@@ -78,6 +79,7 @@ public abstract List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean>
7879
*
7980
* @param <SplitT> Split type.
8081
*/
82+
@Deprecated
8183
@PublicEvolving
8284
public interface Provider<SplitT> extends Serializable {
8385

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.function.Supplier;
3939

4040
/** A split enumerator based on sql-parameters grains. */
41+
@Deprecated
4142
public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> {
4243

4344
public static final Logger LOG = LoggerFactory.getLogger(SqlTemplateSplitEnumerator.class);
@@ -105,6 +106,7 @@ public JdbcParameterValuesProvider getParameterValuesProvider() {
105106
}
106107

107108
/** The {@link TemplateSqlSplitEnumeratorProvider} for {@link SqlTemplateSplitEnumerator}. */
109+
@Deprecated
108110
public static class TemplateSqlSplitEnumeratorProvider
109111
implements JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> {
110112

0 commit comments

Comments
 (0)