Skip to content

Commit 6f318a5

Browse files
davidradllibenchaosnuyanzin
authored
[FLINK-33365] include filters with Lookup joins
Signed-off-by: David Radley <[email protected]> Co-authored-by: Benchao Li <[email protected]> Co-authored-by: Sergey Nuyanzin <[email protected]>
1 parent 33eae1f commit 6f318a5

File tree

9 files changed

+868
-130
lines changed

9 files changed

+868
-130
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,18 @@ public interface FieldNamedPreparedStatement extends AutoCloseable {
6969
*/
7070
static FieldNamedPreparedStatement prepareStatement(
7171
Connection connection, String sql, String[] fieldNames) throws SQLException {
72-
return FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames);
72+
return FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames, "", 0);
73+
}
74+
75+
static FieldNamedPreparedStatement prepareStatement(
76+
Connection connection,
77+
String sql,
78+
String[] fieldNames,
79+
String additionalPredicates,
80+
int numberOfDynamicParams)
81+
throws SQLException {
82+
return FieldNamedPreparedStatementImpl.prepareStatement(
83+
connection, sql, fieldNames, additionalPredicates, numberOfDynamicParams);
7384
}
7485

7586
/**

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,26 +178,46 @@ public void close() throws SQLException {
178178
// ----------------------------------------------------------------------------------------
179179

180180
public static FieldNamedPreparedStatement prepareStatement(
181-
Connection connection, String sql, String[] fieldNames) throws SQLException {
181+
Connection connection,
182+
String sql,
183+
String[] fieldNames,
184+
String additionalPredicates,
185+
int numberOfDynamicParams)
186+
throws SQLException {
182187
checkNotNull(connection, "connection must not be null.");
183188
checkNotNull(sql, "sql must not be null.");
184189
checkNotNull(fieldNames, "fieldNames must not be null.");
185190

186191
if (sql.contains("?")) {
187192
throw new IllegalArgumentException("SQL statement must not contain ? character.");
188193
}
194+
sql = sql + additionalPredicates;
189195

190196
HashMap<String, List<Integer>> parameterMap = new HashMap<>();
191197
String parsedSQL = parseNamedStatement(sql, parameterMap);
198+
192199
// currently, the statements must contain all the field parameters
193-
checkArgument(parameterMap.size() == fieldNames.length);
194-
int[][] indexMapping = new int[fieldNames.length][];
195-
for (int i = 0; i < fieldNames.length; i++) {
200+
final int parameterMapSize = parameterMap.size();
201+
final int fieldNamesLength = fieldNames.length;
202+
checkArgument(
203+
parameterMapSize == fieldNamesLength,
204+
"Expected "
205+
+ fieldNamesLength
206+
+ " fields, but the parsing found "
207+
+ parameterMapSize);
208+
int[][] indexMapping = new int[fieldNamesLength + numberOfDynamicParams][];
209+
int numberOfNameBasedParams = 0;
210+
for (int i = 0; i < fieldNamesLength; i++) {
196211
String fieldName = fieldNames[i];
197212
checkArgument(
198213
parameterMap.containsKey(fieldName),
199214
fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
200215
indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
216+
numberOfNameBasedParams += parameterMap.get(fieldName).size();
217+
}
218+
for (int i = 0; i < numberOfDynamicParams; ++i) {
219+
// FieldNamedPreparedStatement is 0-based, however, PreparedStatement is 1-based
220+
indexMapping[i + fieldNamesLength] = new int[] {i + numberOfNameBasedParams + 1};
201221
}
202222

203223
return new FieldNamedPreparedStatementImpl(

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
110110
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
111111
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
112112
keyNames,
113-
rowType);
113+
rowType,
114+
resolvedPredicates,
115+
pushdownParams);
114116
if (cache != null) {
115117
return PartialCachingLookupProvider.of(lookupFunction, cache);
116118
} else {

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.slf4j.LoggerFactory;
3838

3939
import java.io.IOException;
40+
import java.io.Serializable;
4041
import java.sql.Connection;
4142
import java.sql.ResultSet;
4243
import java.sql.SQLException;
@@ -45,6 +46,7 @@
4546
import java.util.Collection;
4647
import java.util.Collections;
4748
import java.util.List;
49+
import java.util.stream.Collectors;
4850

4951
import static org.apache.flink.util.Preconditions.checkArgument;
5052
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,6 +65,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction {
6365
private final JdbcRowConverter jdbcRowConverter;
6466
private final JdbcRowConverter lookupKeyRowConverter;
6567

68+
private final List<String> resolvedPredicates;
69+
private final Serializable[] pushdownParams;
70+
6671
private transient FieldNamedPreparedStatement statement;
6772

6873
public JdbcRowDataLookupFunction(
@@ -71,11 +76,15 @@ public JdbcRowDataLookupFunction(
7176
String[] fieldNames,
7277
DataType[] fieldTypes,
7378
String[] keyNames,
74-
RowType rowType) {
79+
RowType rowType,
80+
List<String> resolvedPredicates,
81+
Serializable[] pushdownParams) {
7582
checkNotNull(options, "No JdbcOptions supplied.");
7683
checkNotNull(fieldNames, "No fieldNames supplied.");
7784
checkNotNull(fieldTypes, "No fieldTypes supplied.");
7885
checkNotNull(keyNames, "No keyNames supplied.");
86+
checkNotNull(resolvedPredicates, "No resolvedPredicates supplied.");
87+
checkNotNull(pushdownParams, "No pushdownParams supplied.");
7988
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
8089
this.keyNames = keyNames;
8190
List<String> nameList = Arrays.asList(fieldNames);
@@ -103,6 +112,8 @@ public JdbcRowDataLookupFunction(
103112
Arrays.stream(keyTypes)
104113
.map(DataType::getLogicalType)
105114
.toArray(LogicalType[]::new)));
115+
this.resolvedPredicates = resolvedPredicates;
116+
this.pushdownParams = pushdownParams;
106117
}
107118

108119
@Override
@@ -116,6 +127,15 @@ public void open(FunctionContext context) throws Exception {
116127
}
117128
}
118129

130+
private FieldNamedPreparedStatement setPredicateParams(FieldNamedPreparedStatement statement)
131+
throws SQLException {
132+
for (int i = 0; i < pushdownParams.length; ++i) {
133+
statement.setObject(i + keyNames.length, pushdownParams[i]);
134+
}
135+
136+
return statement;
137+
}
138+
119139
/**
120140
* This is a lookup method which is called by Flink framework in runtime.
121141
*
@@ -127,6 +147,7 @@ public Collection<RowData> lookup(RowData keyRow) {
127147
try {
128148
statement.clearParameters();
129149
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
150+
statement = setPredicateParams(statement);
130151
try (ResultSet resultSet = statement.executeQuery()) {
131152
ArrayList<RowData> rows = new ArrayList<>();
132153
while (resultSet.next()) {
@@ -167,7 +188,21 @@ public Collection<RowData> lookup(RowData keyRow) {
167188

168189
private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
169190
Connection dbConn = connectionProvider.getOrEstablishConnection();
170-
statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames);
191+
String additionalPredicates = "";
192+
if (!resolvedPredicates.isEmpty()) {
193+
String joinedConditions =
194+
resolvedPredicates.stream()
195+
.map(pred -> String.format("(%s)", pred))
196+
.collect(Collectors.joining(" AND "));
197+
if (keyNames.length == 0) {
198+
additionalPredicates = " WHERE " + joinedConditions;
199+
} else {
200+
additionalPredicates = " AND " + joinedConditions;
201+
}
202+
}
203+
statement =
204+
FieldNamedPreparedStatement.prepareStatement(
205+
dbConn, query, keyNames, additionalPredicates, pushdownParams.length);
171206
}
172207

173208
@Override

0 commit comments

Comments
 (0)