Skip to content

Commit cdbee4f

Browse files
committed
[FLINK-38733] Add new SplitterEnumerator on JdbcSource
1 parent 59dea79 commit cdbee4f

File tree

5 files changed

+45
-7
lines changed

5 files changed

+45
-7
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void start() {
8787
} else {
8888
context.callAsync(
8989
() ->
90-
splitterEnumerator.hasFinishSplits()
90+
splitterEnumerator.isAllSplitsFinished()
9191
? Collections.emptyList()
9292
: splitterEnumerator.enumerateSplits(),
9393
(List<JdbcSourceSplit> splits, Throwable error) ->

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterEnumerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class PreparedSplitterEnumerator implements SplitterEnumerator {
4444

4545
private final String sqlTemplate;
4646
private final Serializable[][] sqlParameters;
47-
private Boolean finished;
47+
private boolean finished;
4848

4949
protected PreparedSplitterEnumerator(String sqlTemplate, Serializable[][] sqlParameters) {
5050
this.sqlTemplate = Preconditions.checkNotNull(sqlTemplate);
@@ -68,7 +68,7 @@ public void start(JdbcConnectionProvider connectionProvider) {}
6868
public void close() {}
6969

7070
@Override
71-
public boolean hasFinishSplits() {
71+
public boolean isAllSplitsFinished() {
7272
return this.finished;
7373
}
7474

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SlideTimingSplitterEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static SlideTimingSplitterEnumerator of(
5252
}
5353

5454
@Override
55-
public boolean hasFinishSplits() {
55+
public boolean isAllSplitsFinished() {
5656
return false;
5757
}
5858

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SplitterEnumerator.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter;
2020

21+
import org.apache.flink.annotation.PublicEvolving;
2122
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
2223
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
2324

@@ -29,30 +30,67 @@
2930
import java.util.function.Supplier;
3031

3132
/** Interface for jdbc sql split enumerator. */
33+
@PublicEvolving
3234
public interface SplitterEnumerator extends AutoCloseable, Serializable {
3335

36+
/**
37+
* Start the enumerator.
38+
*
39+
* @param connectionProvider The JDBC connection provider.
40+
*/
3441
void start(JdbcConnectionProvider connectionProvider);
3542

43+
/** Close the enumerator. */
3644
void close();
3745

38-
boolean hasFinishSplits();
46+
/** All splits have been enumerated. */
47+
boolean isAllSplitsFinished();
3948

49+
/** Enumerate the JDBC splits. */
4050
List<JdbcSourceSplit> enumerateSplits();
4151

52+
/**
53+
* Enumerate the JDBC splits.
54+
*
55+
* @param splitGettable If the next batch splits are gettable.
56+
* @return The result splits generated by the split enumerator.
57+
*/
4258
default List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> splitGettable) {
4359
return enumerateSplits(splitGettable.get());
4460
}
4561

46-
default List<JdbcSourceSplit> enumerateSplits(Boolean splitGettable) {
62+
/**
63+
* Enumerate the JDBC splits.
64+
*
65+
* @param splitGettable If the next batch splits are gettable.
66+
* @return The result splits generated by the split enumerator.
67+
*/
68+
default List<JdbcSourceSplit> enumerateSplits(boolean splitGettable) {
4769
if (!splitGettable) {
4870
return Collections.emptyList();
4971
}
5072
return enumerateSplits();
5173
}
5274

75+
/**
76+
* Get lineage queries for splits.
77+
*
78+
* @return The lineage queries.
79+
*/
5380
List<String> lineageQueries();
5481

82+
/**
83+
* Get the serializable state of the enumerator.
84+
*
85+
* @return The serializable state.
86+
*/
5587
Serializable serializableState();
5688

89+
/**
90+
* Restore the enumerator state from the given serializable state.
91+
*
92+
* @param state The serializable state.
93+
* @return The restored SplitterEnumerator.
94+
*/
5795
SplitterEnumerator restoreState(Serializable state);
5896
}

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void start(JdbcConnectionProvider connectionProvider) {}
114114
public void close() {}
115115

116116
@Override
117-
public boolean hasFinishSplits() {
117+
public boolean isAllSplitsFinished() {
118118
return false;
119119
}
120120

0 commit comments

Comments
 (0)