Skip to content

Commit 68be9eb

Browse files
committed
[FLINK-38733] Clean Generics on source enumerator
1 parent ea84be6 commit 68be9eb

File tree

5 files changed

+16
-25
lines changed

5 files changed

+16
-25
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class JdbcSource<OUT>
7171
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
7272

7373
private final Configuration configuration;
74-
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
74+
private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider;
7575

7676
protected JdbcConnectionProvider connectionProvider;
7777
private final ResultExtractor<OUT> resultExtractor;
@@ -80,7 +80,7 @@ public class JdbcSource<OUT>
8080
JdbcSource(
8181
Configuration configuration,
8282
JdbcConnectionProvider connectionProvider,
83-
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
83+
JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider,
8484
ResultExtractor<OUT> resultExtractor,
8585
TypeInformation<OUT> typeInformation,
8686
@Nullable DeliveryGuarantee deliveryGuarantee,
@@ -167,7 +167,7 @@ public static <OUT> JdbcSourceBuilder<OUT> builder() {
167167
// ---- Visible for testing methods. ---
168168

169169
@VisibleForTesting
170-
public JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> getSqlSplitEnumeratorProvider() {
170+
public JdbcSqlSplitEnumeratorBase.Provider getSqlSplitEnumeratorProvider() {
171171
return sqlSplitEnumeratorProvider;
172172
}
173173

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ public class JdbcSourceEnumerator
5050
private final Boundedness boundedness;
5151
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
5252
private final List<JdbcSourceSplit> unassigned;
53-
private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator;
53+
private final JdbcSqlSplitEnumeratorBase sqlSplitEnumerator;
5454
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
5555

5656
public JdbcSourceEnumerator(
5757
SplitEnumeratorContext<JdbcSourceSplit> context,
58-
JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator,
58+
JdbcSqlSplitEnumeratorBase sqlSplitEnumerator,
5959
ContinuousUnBoundingSettings continuousUnBoundingSettings,
6060
List<JdbcSourceSplit> unassigned) {
6161
this.context = Preconditions.checkNotNull(context);

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,9 @@
2929
import java.util.List;
3030
import java.util.function.Supplier;
3131

32-
/**
33-
* Base class for jdbc sql split enumerator.
34-
*
35-
* @param <SplitT> JDBC split type.
36-
*/
32+
/** Base class for jdbc sql split enumerator. */
3733
@PublicEvolving
38-
public abstract class JdbcSqlSplitEnumeratorBase<SplitT> implements AutoCloseable, Serializable {
34+
public abstract class JdbcSqlSplitEnumeratorBase implements AutoCloseable, Serializable {
3935
private final char[] currentId = "0000000000".toCharArray();
4036

4137
protected @Nullable Serializable optionalSqlSplitEnumeratorState;
@@ -73,29 +69,24 @@ private static void incrementCharArrayByOne(char[] array, int pos) {
7369
public abstract List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> splitGettable)
7470
throws IOException;
7571

76-
/**
77-
* A provider to create or restore a JDBC sql splits enumerator.
78-
*
79-
* @param <SplitT> Split type.
80-
*/
72+
/** A provider to create or restore a JDBC sql splits enumerator. */
8173
@PublicEvolving
82-
public interface Provider<SplitT> extends Serializable {
74+
public interface Provider extends Serializable {
8375

8476
/**
8577
* Called when init the provider without state.
8678
*
8779
* @return An instance of {@link JdbcSqlSplitEnumeratorBase}.
8880
*/
89-
JdbcSqlSplitEnumeratorBase<SplitT> create();
81+
JdbcSqlSplitEnumeratorBase create();
9082

9183
/**
9284
* Called when restore the provider without state.
9385
*
9486
* @param optionalSqlSplitEnumeratorState The state defined by users.
9587
* @return An instance of {@link JdbcSqlSplitEnumeratorBase}.
9688
*/
97-
JdbcSqlSplitEnumeratorBase<SplitT> restore(
98-
@Nullable Serializable optionalSqlSplitEnumeratorState);
89+
JdbcSqlSplitEnumeratorBase restore(@Nullable Serializable optionalSqlSplitEnumeratorState);
9990
}
10091

10192
public void open() {}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import java.util.function.Supplier;
3939

4040
/** A split enumerator based on sql-parameters grains. */
41-
public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> {
41+
public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase {
4242

4343
public static final Logger LOG = LoggerFactory.getLogger(SqlTemplateSplitEnumerator.class);
4444

@@ -106,7 +106,7 @@ public JdbcParameterValuesProvider getParameterValuesProvider() {
106106

107107
/** The {@link TemplateSqlSplitEnumeratorProvider} for {@link SqlTemplateSplitEnumerator}. */
108108
public static class TemplateSqlSplitEnumeratorProvider
109-
implements JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> {
109+
implements JdbcSqlSplitEnumeratorBase.Provider {
110110

111111
private String sqlTemplate;
112112

@@ -135,13 +135,13 @@ public TemplateSqlSplitEnumeratorProvider setOptionalSqlSplitEnumeratorState(
135135
}
136136

137137
@Override
138-
public JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> create() {
138+
public JdbcSqlSplitEnumeratorBase create() {
139139
return new SqlTemplateSplitEnumerator(
140140
this.optionalSqlSplitEnumeratorState, sqlTemplate, parameterValuesProvider);
141141
}
142142

143143
@Override
144-
public JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> restore(
144+
public JdbcSqlSplitEnumeratorBase restore(
145145
@Nullable Serializable optionalSqlSplitEnumeratorState) {
146146
return new SqlTemplateSplitEnumerator(
147147
optionalSqlSplitEnumeratorState, sqlTemplate, parameterValuesProvider);

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private static JdbcSourceEnumerator createEnumerator(
108108

109109
return new JdbcSourceEnumerator(
110110
context,
111-
new JdbcSqlSplitEnumeratorBase<JdbcSourceSplit>(null) {
111+
new JdbcSqlSplitEnumeratorBase(null) {
112112
@Override
113113
public @Nonnull List<JdbcSourceSplit> enumerateSplits(
114114
@Nonnull Supplier<Boolean> splitGettable) throws IOException {

0 commit comments

Comments
 (0)