Skip to content

Commit ea84be6

Browse files
authored
[FLINK-33463] Use Source API implementation in JdbcDynamicTableSource
1 parent 669314d commit ea84be6

File tree

4 files changed

+118
-23
lines changed

4 files changed

+118
-23
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor;
20+
21+
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
22+
import org.apache.flink.table.data.RowData;
23+
import org.apache.flink.util.Preconditions;
24+
25+
import java.sql.ResultSet;
26+
import java.sql.SQLException;
27+
28+
/** The result extractor for {@link RowData}. */
29+
public class RowDataResultExtractor implements ResultExtractor<RowData> {
30+
31+
private final JdbcDialectConverter jdbcDialectConverter;
32+
33+
public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) {
34+
this.jdbcDialectConverter = Preconditions.checkNotNull(jdbcDialectConverter);
35+
}
36+
37+
@Override
38+
public RowData extract(ResultSet resultSet) throws SQLException {
39+
return jdbcDialectConverter.toInternal(resultSet);
40+
}
41+
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
124124
config.get(URL),
125125
config.get(COMPATIBLE_MODE),
126126
context.getClassLoader());
127+
final String tableIdentifier = context.getObjectIdentifier().asSummaryString();
127128
return new JdbcDynamicTableSource(
128129
getJdbcOptions(helper.getOptions(), context.getClassLoader()),
129130
getJdbcReadOptions(helper.getOptions()),
130131
helper.getOptions().get(LookupOptions.MAX_RETRIES),
131132
getLookupCache(config),
132133
helper.getOptions().get(FILTER_HANDLING_POLICY),
133-
context.getPhysicalRowDataType());
134+
context.getPhysicalRowDataType(),
135+
tableIdentifier);
134136
}
135137

136138
private static void validateDataTypeWithJdbcDialect(

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,27 @@
1919
package org.apache.flink.connector.jdbc.core.table.source;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.connector.source.Boundedness;
2224
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
25+
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
26+
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
27+
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.RowDataResultExtractor;
2328
import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
2429
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
2530
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
2631
import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
2732
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2833
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
2934
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
35+
import org.apache.flink.streaming.api.datastream.DataStream;
36+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
37+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3038
import org.apache.flink.table.connector.ChangelogMode;
3139
import org.apache.flink.table.connector.Projection;
40+
import org.apache.flink.table.connector.ProviderContext;
41+
import org.apache.flink.table.connector.source.DataStreamScanProvider;
3242
import org.apache.flink.table.connector.source.DynamicTableSource;
33-
import org.apache.flink.table.connector.source.InputFormatProvider;
3443
import org.apache.flink.table.connector.source.LookupTableSource;
3544
import org.apache.flink.table.connector.source.ScanTableSource;
3645
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
@@ -39,6 +48,7 @@
3948
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
4049
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
4150
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
51+
import org.apache.flink.table.data.RowData;
4252
import org.apache.flink.table.expressions.CallExpression;
4353
import org.apache.flink.table.expressions.ResolvedExpression;
4454
import org.apache.flink.table.types.DataType;
@@ -70,6 +80,8 @@ public class JdbcDynamicTableSource
7080
SupportsFilterPushDown {
7181
private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class);
7282

83+
private static final String JDBC_TRANSFORMATION = "jdbc";
84+
7385
private final InternalJdbcConnectionOptions options;
7486
private final JdbcReadOptions readOptions;
7587
private final int lookupMaxRetryTimes;
@@ -80,21 +92,24 @@ public class JdbcDynamicTableSource
8092
private long limit = -1;
8193
private List<String> resolvedPredicates = new ArrayList<>();
8294
private Serializable[] pushdownParams = new Serializable[0];
95+
private final String tableIdentifier;
8396

8497
public JdbcDynamicTableSource(
8598
InternalJdbcConnectionOptions options,
8699
JdbcReadOptions readOptions,
87100
int lookupMaxRetryTimes,
88101
@Nullable LookupCache cache,
89102
FilterHandlingPolicy filterHandlingPolicy,
90-
DataType physicalRowDataType) {
103+
DataType physicalRowDataType,
104+
String tableIdentifier) {
91105
this.options = options;
92106
this.readOptions = readOptions;
93107
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
94108
this.cache = cache;
95109
this.filterHandlingPolicy = filterHandlingPolicy;
96110
this.physicalRowDataType = physicalRowDataType;
97111
this.dialectName = options.getDialect().dialectName();
112+
this.tableIdentifier = tableIdentifier;
98113
}
99114

100115
@Override
@@ -126,17 +141,17 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
126141
}
127142

128143
@Override
129-
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
130-
final JdbcRowDataInputFormat.Builder builder =
131-
JdbcRowDataInputFormat.builder()
132-
.setDrivername(options.getDriverName())
144+
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
145+
final JdbcSourceBuilder<RowData> builder =
146+
JdbcSource.<RowData>builder()
147+
.setDriverName(options.getDriverName())
133148
.setDBUrl(options.getDbURL())
134149
.setUsername(options.getUsername().orElse(null))
135150
.setPassword(options.getPassword().orElse(null))
136151
.setAutoCommit(readOptions.getAutoCommit());
137152

138153
if (readOptions.getFetchSize() != 0) {
139-
builder.setFetchSize(readOptions.getFetchSize());
154+
builder.setResultSetFetchSize(readOptions.getFetchSize());
140155
}
141156
final JdbcDialect dialect = options.getDialect();
142157
String query =
@@ -158,19 +173,19 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
158173
.ofBatchNum(numPartitions),
159174
new JdbcGenericParameterValuesProvider(allPushdownParams));
160175

161-
builder.setParametersProvider(allParams);
176+
builder.setJdbcParameterValuesProvider(allParams);
162177

163178
predicates.add(
164179
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
165180
+ " BETWEEN ? AND ?");
166181
} else {
167-
builder.setParametersProvider(
182+
builder.setJdbcParameterValuesProvider(
168183
new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
169184
}
170185

171186
predicates.addAll(this.resolvedPredicates);
172187

173-
if (predicates.size() > 0) {
188+
if (!predicates.isEmpty()) {
174189
String joinedConditions =
175190
predicates.stream()
176191
.map(pred -> String.format("(%s)", pred))
@@ -184,13 +199,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
184199

185200
LOG.debug("Query generated for JDBC scan: " + query);
186201

187-
builder.setQuery(query);
202+
builder.setSql(query);
188203
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
189-
builder.setRowConverter(dialect.getRowConverter(rowType));
190-
builder.setRowDataTypeInfo(
191-
runtimeProviderContext.createTypeInformation(physicalRowDataType));
192-
193-
return InputFormatProvider.of(builder.build());
204+
builder.setResultExtractor(new RowDataResultExtractor(dialect.getRowConverter(rowType)));
205+
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
206+
options.getProperties()
207+
.forEach(
208+
(key, value) ->
209+
builder.setConnectionProperty(key.toString(), value.toString()));
210+
return new JdbcDataStreamScanProvider(builder.build(), tableIdentifier);
194211
}
195212

196213
@Override
@@ -218,7 +235,8 @@ public DynamicTableSource copy() {
218235
lookupMaxRetryTimes,
219236
cache,
220237
filterHandlingPolicy,
221-
physicalRowDataType);
238+
physicalRowDataType,
239+
tableIdentifier);
222240
newSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
223241
newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, this.pushdownParams.length);
224242
return newSource;
@@ -314,4 +332,33 @@ private Serializable[][] replicatePushdownParamsForN(int n) {
314332
}
315333
return allPushdownParams;
316334
}
335+
336+
private static class JdbcDataStreamScanProvider implements DataStreamScanProvider {
337+
338+
private final JdbcSource<RowData> source;
339+
private final String tableIdentifier;
340+
341+
public JdbcDataStreamScanProvider(JdbcSource<RowData> source, String tableIdentifier) {
342+
this.source = Preconditions.checkNotNull(source);
343+
this.tableIdentifier = Preconditions.checkNotNull(tableIdentifier);
344+
}
345+
346+
@Override
347+
public DataStream<RowData> produceDataStream(
348+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
349+
DataStreamSource<RowData> sourceStream =
350+
execEnv.fromSource(
351+
source,
352+
WatermarkStrategy.noWatermarks(),
353+
String.format(
354+
"%s-%s", JdbcSource.class.getSimpleName(), tableIdentifier));
355+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
356+
return sourceStream;
357+
}
358+
359+
@Override
360+
public boolean isBounded() {
361+
return source.getBoundedness() == Boundedness.BOUNDED;
362+
}
363+
}
317364
}

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ void testJdbcCommonProperties() {
9191
LookupOptions.MAX_RETRIES.defaultValue(),
9292
null,
9393
FilterHandlingPolicy.NEVER,
94-
SCHEMA.toPhysicalRowDataType());
94+
SCHEMA.toPhysicalRowDataType(),
95+
"anonymous");
9596
assertThat(actualSource).isEqualTo(expectedSource);
9697

9798
// validation for sink
@@ -149,7 +150,8 @@ void testJdbcReadProperties() {
149150
LookupOptions.MAX_RETRIES.defaultValue(),
150151
null,
151152
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
152-
SCHEMA.toPhysicalRowDataType());
153+
SCHEMA.toPhysicalRowDataType(),
154+
"anonymous");
153155

154156
assertThat(actual).isEqualTo(expected);
155157
}
@@ -178,7 +180,8 @@ void testJdbcLookupProperties() {
178180
10,
179181
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
180182
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
181-
SCHEMA.toPhysicalRowDataType());
183+
SCHEMA.toPhysicalRowDataType(),
184+
"anonymous");
182185

183186
assertThat(actual).isEqualTo(expected);
184187
}
@@ -207,7 +210,8 @@ void testJdbcLookupPropertiesWithLegacyOptions() {
207210
.expireAfterWrite(Duration.ofSeconds(10))
208211
.build(),
209212
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
210-
SCHEMA.toPhysicalRowDataType());
213+
SCHEMA.toPhysicalRowDataType(),
214+
"anonymous");
211215

212216
assertThat(actual).isEqualTo(expected);
213217
}
@@ -393,7 +397,8 @@ void testJdbcLookupPropertiesWithExcludeEmptyResult() {
393397
.expireAfterWrite(Duration.ofSeconds(10))
394398
.build(),
395399
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
396-
SCHEMA.toPhysicalRowDataType());
400+
SCHEMA.toPhysicalRowDataType(),
401+
"anonymous");
397402

398403
assertThat(actual).isEqualTo(expected);
399404
}

0 commit comments

Comments
 (0)