Skip to content

Commit 35399ed

Browse files
committed
[FLINK-38733] Add new SplitterEnumerator on JdbcSource
1 parent 68be9eb commit 35399ed

File tree

14 files changed

+418
-314
lines changed

14 files changed

+418
-314
lines changed

flink-connector-jdbc-architecture/archunit-violations/afc13e03-aa50-408b-b87e-aa491cf0b8cb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getCon
3737
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getResultExtractor()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcSource.java:0)
3838
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getSqlSplitEnumeratorProvider()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcSource.java:0)
3939
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getTypeInformation()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcSource.java:0)
40-
Method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getParameterValuesProvider()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SqlTemplateSplitEnumerator.java:0)
41-
Method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SqlTemplateSplitEnumerator.java:0)
4240
Method <org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader.getConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcSourceSplitReader.java:0)
4341
Method <org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader.getResultSet()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcSourceSplitReader.java:0)
4442
Method <org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader.getSplits()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcSourceSplitReader.java:0)
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@
3434
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer;
3535
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
3636
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
37-
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
38-
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
37+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
3938
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
4039
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
4140
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
@@ -71,7 +70,7 @@ public class JdbcSource<OUT>
7170
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
7271

7372
private final Configuration configuration;
74-
private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider;
73+
private final SplitterEnumerator splitterEnumerator;
7574

7675
protected JdbcConnectionProvider connectionProvider;
7776
private final ResultExtractor<OUT> resultExtractor;
@@ -80,14 +79,14 @@ public class JdbcSource<OUT>
8079
JdbcSource(
8180
Configuration configuration,
8281
JdbcConnectionProvider connectionProvider,
83-
JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider,
82+
SplitterEnumerator splitterEnumerator,
8483
ResultExtractor<OUT> resultExtractor,
8584
TypeInformation<OUT> typeInformation,
8685
@Nullable DeliveryGuarantee deliveryGuarantee,
8786
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
8887
this.configuration = Preconditions.checkNotNull(configuration);
8988
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
90-
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
89+
this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator);
9190
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
9291
this.deliveryGuarantee =
9392
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
@@ -125,7 +124,8 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumera
125124
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
126125
return new JdbcSourceEnumerator(
127126
enumContext,
128-
sqlSplitEnumeratorProvider.create(),
127+
splitterEnumerator,
128+
connectionProvider,
129129
continuousUnBoundingSettings,
130130
new ArrayList<>());
131131
}
@@ -139,7 +139,8 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
139139
checkpoint.getOptionalUserDefinedSplitEnumeratorState();
140140
return new JdbcSourceEnumerator(
141141
enumContext,
142-
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
142+
splitterEnumerator.restoreState(optionalUserDefinedSplitEnumeratorState),
143+
connectionProvider,
143144
continuousUnBoundingSettings,
144145
checkpoint.getRemainingSplits());
145146
}
@@ -167,8 +168,8 @@ public static <OUT> JdbcSourceBuilder<OUT> builder() {
167168
// ---- Visible for testing methods. ---
168169

169170
@VisibleForTesting
170-
public JdbcSqlSplitEnumeratorBase.Provider getSqlSplitEnumeratorProvider() {
171-
return sqlSplitEnumeratorProvider;
171+
public SplitterEnumerator getSplitterEnumerator() {
172+
return splitterEnumerator;
172173
}
173174

174175
@VisibleForTesting
@@ -204,7 +205,7 @@ public boolean equals(Object o) {
204205
return boundedness == that.boundedness
205206
&& Objects.equals(typeInformation, that.typeInformation)
206207
&& Objects.equals(configuration, that.configuration)
207-
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
208+
&& Objects.equals(splitterEnumerator, that.splitterEnumerator)
208209
&& Objects.equals(connectionProvider, that.connectionProvider)
209210
&& Objects.equals(resultExtractor, that.resultExtractor)
210211
&& deliveryGuarantee == that.deliveryGuarantee
@@ -215,9 +216,8 @@ public boolean equals(Object o) {
215216
public LineageVertex getLineageVertex() {
216217
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
217218
new DefaultTypeDatasetFacet(getTypeInformation());
218-
SqlTemplateSplitEnumerator enumerator =
219-
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
220-
Optional<String> nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true);
219+
Optional<String> nameOpt =
220+
LineageUtils.tableNameOf(splitterEnumerator.lineageQueries(), true);
221221
String namespace = LineageUtils.namespaceOf(connectionProvider);
222222
LineageDataset dataset =
223223
LineageUtils.datasetOf(

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.apache.flink.connector.base.DeliveryGuarantee;
2525
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2626
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
27-
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
27+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.PreparedSplitterEnumerator;
28+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SlideTimingSplitterEnumerator;
29+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
2830
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
2931
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3032
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
@@ -118,7 +120,7 @@ public class JdbcSourceBuilder<OUT> {
118120
private JdbcParameterValuesProvider jdbcParameterValuesProvider;
119121
private @Nullable Serializable optionalSqlSplitEnumeratorState;
120122
private ResultExtractor<OUT> resultExtractor;
121-
123+
private SplitterEnumerator splitterEnumerator;
122124
private JdbcConnectionProvider connectionProvider;
123125

124126
JdbcSourceBuilder() {
@@ -132,6 +134,11 @@ public class JdbcSourceBuilder<OUT> {
132134
this.autoCommit = true;
133135
}
134136

137+
public JdbcSourceBuilder<OUT> setSplitter(SplitterEnumerator splitterEnumerator) {
138+
this.splitterEnumerator = splitterEnumerator;
139+
return this;
140+
}
141+
135142
public JdbcSourceBuilder<OUT> setSql(@Nonnull String sql) {
136143
Preconditions.checkArgument(
137144
!StringUtils.isNullOrWhitespaceOnly(sql),
@@ -289,36 +296,54 @@ public JdbcSource<OUT> build() {
289296
JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
290297
this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);
291298

292-
Preconditions.checkState(
293-
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
294299
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
295300
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");
296301

297-
if (Objects.nonNull(continuousUnBoundingSettings)) {
298-
Preconditions.checkArgument(
299-
Objects.nonNull(jdbcParameterValuesProvider)
300-
&& jdbcParameterValuesProvider
301-
instanceof JdbcSlideTimingParameterProvider,
302-
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
303-
}
304-
305-
if (Objects.nonNull(jdbcParameterValuesProvider)
306-
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
307-
Preconditions.checkArgument(
308-
Objects.nonNull(continuousUnBoundingSettings),
309-
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
302+
if (this.splitterEnumerator == null) {
303+
Preconditions.checkState(
304+
!StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty.");
305+
306+
if (Objects.nonNull(continuousUnBoundingSettings)) {
307+
Preconditions.checkArgument(
308+
Objects.nonNull(jdbcParameterValuesProvider)
309+
&& jdbcParameterValuesProvider
310+
instanceof JdbcSlideTimingParameterProvider,
311+
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
312+
}
313+
314+
if (Objects.nonNull(jdbcParameterValuesProvider)
315+
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
316+
Preconditions.checkArgument(
317+
Objects.nonNull(continuousUnBoundingSettings),
318+
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
319+
}
320+
321+
this.splitterEnumerator =
322+
getSplitter(sql, jdbcParameterValuesProvider, optionalSqlSplitEnumeratorState);
310323
}
311324

312325
return new JdbcSource<>(
313326
configuration,
314327
connectionProvider,
315-
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
316-
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
317-
.setSqlTemplate(sql)
318-
.setParameterValuesProvider(jdbcParameterValuesProvider),
328+
splitterEnumerator,
319329
resultExtractor,
320330
typeInformation,
321331
deliveryGuarantee,
322332
continuousUnBoundingSettings);
323333
}
334+
335+
private SplitterEnumerator getSplitter(
336+
String sqlTemplate,
337+
JdbcParameterValuesProvider parameterProvider,
338+
Serializable userDefinedState) {
339+
if (parameterProvider == null) {
340+
return PreparedSplitterEnumerator.of(sqlTemplate);
341+
} else if (parameterProvider instanceof JdbcSlideTimingParameterProvider) {
342+
return SlideTimingSplitterEnumerator.of(
343+
sqlTemplate, parameterProvider, userDefinedState);
344+
} else {
345+
return PreparedSplitterEnumerator.of(
346+
sqlTemplate, parameterProvider.getParameterValues());
347+
}
348+
}
324349
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.flink.api.connector.source.SplitEnumerator;
2424
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2525
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
26+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter.SplitterEnumerator;
2627
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
28+
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
2729
import org.apache.flink.util.Preconditions;
2830

2931
import org.slf4j.Logger;
@@ -50,47 +52,52 @@ public class JdbcSourceEnumerator
5052
private final Boundedness boundedness;
5153
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
5254
private final List<JdbcSourceSplit> unassigned;
53-
private final JdbcSqlSplitEnumeratorBase sqlSplitEnumerator;
55+
private final SplitterEnumerator splitterEnumerator;
5456
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
57+
private final JdbcConnectionProvider connectionProvider;
5558

5659
public JdbcSourceEnumerator(
5760
SplitEnumeratorContext<JdbcSourceSplit> context,
58-
JdbcSqlSplitEnumeratorBase sqlSplitEnumerator,
61+
SplitterEnumerator splitterEnumerator,
62+
JdbcConnectionProvider connectionProvider,
5963
ContinuousUnBoundingSettings continuousUnBoundingSettings,
6064
List<JdbcSourceSplit> unassigned) {
6165
this.context = Preconditions.checkNotNull(context);
62-
this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator);
66+
this.splitterEnumerator = Preconditions.checkNotNull(splitterEnumerator);
6367
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
6468
this.boundedness =
6569
Objects.isNull(continuousUnBoundingSettings)
6670
? Boundedness.BOUNDED
6771
: Boundedness.CONTINUOUS_UNBOUNDED;
6872
this.unassigned = Preconditions.checkNotNull(unassigned);
6973
this.readersAwaitingSplit = new LinkedHashMap<>();
74+
this.connectionProvider = connectionProvider;
7075
}
7176

7277
@Override
7378
public void start() {
74-
sqlSplitEnumerator.open();
79+
splitterEnumerator.start(connectionProvider);
7580
if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED
7681
&& Objects.nonNull(continuousUnBoundingSettings)) {
7782
context.callAsync(
78-
() -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
83+
() -> splitterEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
7984
this::processNewSplits,
8085
continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(),
8186
continuousUnBoundingSettings.getDiscoveryInterval().toMillis());
8287
} else {
83-
try {
84-
unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true));
85-
} catch (IOException e) {
86-
throw new RuntimeException(e);
87-
}
88+
context.callAsync(
89+
() ->
90+
splitterEnumerator.hasFinishSplits()
91+
? Collections.emptyList()
92+
: splitterEnumerator.enumerateSplits(),
93+
(List<JdbcSourceSplit> splits, Throwable error) ->
94+
this.unassigned.addAll(splits));
8895
}
8996
}
9097

9198
@Override
9299
public void close() throws IOException {
93-
sqlSplitEnumerator.close();
100+
splitterEnumerator.close();
94101
}
95102

96103
@Override
@@ -136,7 +143,7 @@ public JdbcSourceEnumeratorState snapshotState(long checkpointId) throws Excepti
136143
Collections.emptyList(),
137144
Collections.emptyList(),
138145
new ArrayList<>(unassigned),
139-
sqlSplitEnumerator.optionalSqlSplitEnumeratorState);
146+
splitterEnumerator.serializableState());
140147
}
141148

142149
private Optional<JdbcSourceSplit> getNextSplit() {

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)