Skip to content

Commit c677143

Browse files
author
och5351
committed
[hotfix] Support passing arbitrary database options to JDBC Catalog
1 parent cc5e292 commit c677143

File tree

31 files changed

+1184
-27
lines changed

31 files changed

+1184
-27
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ default JdbcDialect createDialect(String compatibleMode) {
6464
"Not supported option 'compatible-mode' with value: " + compatibleMode);
6565
}
6666

67+
JdbcCatalog createCatalog(
68+
ClassLoader classLoader,
69+
String catalogName,
70+
String username,
71+
String pwd,
72+
String baseUrl);
73+
6774
JdbcCatalog createCatalog(
6875
ClassLoader classLoader,
6976
String catalogName,

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactoryLoader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ public static JdbcDialect loadDialect(
5858
return load(url, classLoader).createDialect(compatibleMode);
5959
}
6060

61+
public static JdbcCatalog loadCatalog(
62+
ClassLoader classLoader,
63+
String catalogName,
64+
String username,
65+
String pwd,
66+
String baseUrl) {
67+
return loadCatalog(classLoader, catalogName, null, username, pwd, baseUrl);
68+
}
69+
6170
public static JdbcCatalog loadCatalog(
6271
ClassLoader classLoader,
6372
String catalogName,

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import java.util.Objects;
7474
import java.util.Optional;
7575
import java.util.Properties;
76+
import java.util.function.Function;
7677
import java.util.function.Predicate;
7778

7879
import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.PASSWORD_KEY;
@@ -96,6 +97,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog implements Jdb
9697
protected final ClassLoader userClassLoader;
9798
protected final String baseUrl;
9899
protected final String defaultUrl;
100+
protected final Function<String, String> urlFunction;
99101
protected final Properties connectionProperties;
100102

101103
@Deprecated
@@ -120,28 +122,23 @@ public AbstractJdbcCatalog(
120122
String defaultDatabase,
121123
String baseUrl,
122124
Properties connectionProperties) {
123-
super(catalogName, defaultDatabase);
125+
super(catalogName, validateJdbcUrl(baseUrl, defaultDatabase));
124126

125127
checkNotNull(userClassLoader);
126128
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
127129

128-
validateJdbcUrl(baseUrl);
129-
130130
this.userClassLoader = userClassLoader;
131+
this.urlFunction = calculateUrlFunction(baseUrl);
131132
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
132-
this.defaultUrl = getDatabaseUrl(defaultDatabase);
133133
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
134+
this.defaultUrl = this.urlFunction.apply(defaultDatabase);
134135
checkArgument(
135136
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
136137
checkArgument(
137138
!StringUtils.isNullOrWhitespaceOnly(
138139
connectionProperties.getProperty(PASSWORD_KEY)));
139140
}
140141

141-
protected String getDatabaseUrl(String databaseName) {
142-
return baseUrl + databaseName;
143-
}
144-
145142
@Override
146143
public void open() throws CatalogException {
147144
// load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail
@@ -274,7 +271,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
274271
String databaseName = tablePath.getDatabaseName();
275272

276273
try (Connection conn =
277-
DriverManager.getConnection(getDatabaseUrl(databaseName), connectionProperties)) {
274+
DriverManager.getConnection(
275+
this.urlFunction.apply(databaseName), connectionProperties)) {
278276
DatabaseMetaData metaData = conn.getMetaData();
279277
Optional<UniqueConstraint> primaryKey =
280278
getPrimaryKey(
@@ -315,7 +313,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
315313
protected Map<String, String> getOptions(ObjectPath tablePath) {
316314
Map<String, String> props = new HashMap<>();
317315
props.put(CONNECTOR.key(), IDENTIFIER);
318-
props.put(URL.key(), getDatabaseUrl(tablePath.getDatabaseName()));
316+
props.put(URL.key(), this.urlFunction.apply(tablePath.getDatabaseName()));
319317
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
320318
props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY));
321319
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
@@ -576,14 +574,57 @@ protected String getSchemaTableName(ObjectPath tablePath) {
576574
throw new UnsupportedOperationException();
577575
}
578576

577+
private Function<String, String> calculateUrlFunction(String url) {
578+
final String[] parts;
579+
final int questionMarkIndex = url.indexOf('?');
580+
if (questionMarkIndex == -1) {
581+
parts = url.split("/+", 3);
582+
return dbName -> parts.length == 3 ? url.trim() : url.trim() + "/" + dbName;
583+
} else {
584+
String withoutParams = url.substring(0, questionMarkIndex);
585+
String prefix = withoutParams.substring(0, withoutParams.lastIndexOf('/') + 1);
586+
return dbName ->
587+
dbName == null ? url : prefix + dbName + "?" + url.substring(questionMarkIndex);
588+
}
589+
}
590+
579591
/**
580592
* URL has to be without database, like "jdbc:dialect://localhost:1234/" or
581593
* "jdbc:dialect://localhost:1234" rather than "jdbc:dialect://localhost:1234/db".
582594
*/
583-
protected static void validateJdbcUrl(String url) {
584-
String[] parts = url.trim().split("\\/+");
585-
586-
checkArgument(parts.length == 2);
595+
protected static String validateJdbcUrl(String url, String defaultDatabase) {
596+
String trimmedUrl = url.trim();
597+
String processedUrl =
598+
trimmedUrl.endsWith("/")
599+
? trimmedUrl.substring(0, trimmedUrl.length() - 1)
600+
: trimmedUrl;
601+
String[] parts = processedUrl.split("/+", 3);
602+
String database;
603+
int questionMark = trimmedUrl.indexOf('?');
604+
if (questionMark == -1) {
605+
if (defaultDatabase == null) {
606+
checkArgument(parts.length > 2);
607+
database = parts[2];
608+
} else {
609+
checkArgument(
610+
parts.length == 2
611+
|| (parts.length == 3 && defaultDatabase.equals(parts[2])));
612+
database = defaultDatabase;
613+
}
614+
} else {
615+
checkArgument(parts.length > 2);
616+
questionMark = parts[2].indexOf('?');
617+
if (defaultDatabase == null) {
618+
database = parts[2].substring(0, questionMark);
619+
} else {
620+
checkArgument(
621+
questionMark > -1
622+
&& Objects.equals(
623+
parts[2].substring(0, questionMark), defaultDatabase));
624+
database = defaultDatabase;
625+
}
626+
}
627+
return database;
587628
}
588629

589630
@Override

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public String factoryIdentifier() {
5151
@Override
5252
public Set<ConfigOption<?>> requiredOptions() {
5353
final Set<ConfigOption<?>> options = new HashSet<>();
54-
options.add(DEFAULT_DATABASE);
5554
options.add(USERNAME);
5655
options.add(PASSWORD);
5756
options.add(BASE_URL);
@@ -61,6 +60,7 @@ public Set<ConfigOption<?>> requiredOptions() {
6160
@Override
6261
public Set<ConfigOption<?>> optionalOptions() {
6362
final Set<ConfigOption<?>> options = new HashSet<>();
63+
options.add(DEFAULT_DATABASE);
6464
options.add(PROPERTY_VERSION);
6565
options.add(COMPATIBLE_MODE);
6666
return options;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class JdbcCatalogFactoryOptions {
3434
public static final ConfigOption<String> DEFAULT_DATABASE =
3535
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
3636
.stringType()
37-
.noDefaultValue();
37+
.defaultValue(null);
3838

3939
public static final ConfigOption<String> USERNAME = JdbcConnectorOptions.USERNAME;
4040

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/derby/database/DerbyFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ public JdbcDialect createDialect() {
3737
return new DerbyDialect();
3838
}
3939

40+
@Override
41+
public JdbcCatalog createCatalog(
42+
ClassLoader classLoader,
43+
String catalogName,
44+
String username,
45+
String pwd,
46+
String baseUrl) {
47+
throw new UnsupportedOperationException("Catalog for Derby is not supported yet.");
48+
}
49+
4050
@Override
4151
public JdbcCatalog createCatalog(
4252
ClassLoader classLoader,

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,33 @@ class AbstractJdbcCatalogTest {
2727

2828
@Test
2929
void testJdbcUrl() {
30-
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/");
31-
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234");
30+
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", "db");
31+
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", null);
32+
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/", "db");
33+
AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234", "db");
3234
}
3335

3436
@Test
3537
void testInvalidJdbcUrl() {
3638
assertThatThrownBy(
3739
() ->
3840
AbstractJdbcCatalog.validateJdbcUrl(
39-
"jdbc:dialect://localhost:1234/db"))
41+
"jdbc:dialect://localhost:1234", null))
42+
.isInstanceOf(IllegalArgumentException.class);
43+
assertThatThrownBy(
44+
() ->
45+
AbstractJdbcCatalog.validateJdbcUrl(
46+
"jdbc:dialect://localhost:1234/", null))
47+
.isInstanceOf(IllegalArgumentException.class);
48+
assertThatThrownBy(
49+
() ->
50+
AbstractJdbcCatalog.validateJdbcUrl(
51+
"jdbc:dialect://localhost:1234/db", ""))
52+
.isInstanceOf(IllegalArgumentException.class);
53+
assertThatThrownBy(
54+
() ->
55+
AbstractJdbcCatalog.validateJdbcUrl(
56+
"jdbc:dialect://localhost:1234/db", "not_db"))
4057
.isInstanceOf(IllegalArgumentException.class);
4158
}
4259
}

flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public JdbcDialect createDialect() {
3838
return new CrateDBDialect();
3939
}
4040

41+
@Override
42+
public JdbcCatalog createCatalog(
43+
ClassLoader classLoader,
44+
String catalogName,
45+
String username,
46+
String pwd,
47+
String baseUrl) {
48+
return new CrateDBCatalog(classLoader, catalogName, null, username, pwd, baseUrl);
49+
}
50+
4151
@Override
4252
public JdbcCatalog createCatalog(
4353
ClassLoader classLoader,

flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
135135

136136
String searchPath =
137137
extractColumnValuesBySQL(
138-
getDatabaseUrl(DEFAULT_DATABASE), "show search_path", 1, null)
138+
urlFunction.apply(DEFAULT_DATABASE), "show search_path", 1, null)
139139
.get(0);
140140
String[] schemas = searchPath.split("\\s*,\\s*");
141141

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.cratedb.database;
20+
21+
import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
22+
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
23+
import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
24+
import org.apache.flink.connector.jdbc.cratedb.CrateDBTestBase;
25+
import org.apache.flink.connector.jdbc.cratedb.database.catalog.CrateDBCatalog;
26+
import org.apache.flink.table.catalog.Catalog;
27+
import org.apache.flink.table.catalog.CommonCatalogOptions;
28+
import org.apache.flink.table.factories.FactoryUtil;
29+
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.Test;
32+
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
/**
39+
* Test class for CrateDB database catalog factory without default database option.
40+
*
41+
* <p>This test verifies catalog creation when the default database option is not provided, ensuring
42+
* that {@link CrateDBCatalog} is properly instantiated using only the base URL configuration.
43+
*/
44+
public class CrateDBFactoryDefaultDatabaseNullTest implements CrateDBTestBase {
45+
protected static String baseUrl;
46+
protected static JdbcCatalog catalog;
47+
protected static final String TEST_CATALOG_NAME = "mycratedb";
48+
49+
@BeforeEach
50+
void setup() {
51+
// jdbc:crate://localhost:49290/crate
52+
baseUrl = getMetadata().getJdbcUrl();
53+
54+
catalog =
55+
JdbcFactoryLoader.loadCatalog(
56+
Thread.currentThread().getContextClassLoader(),
57+
TEST_CATALOG_NAME,
58+
getMetadata().getUsername(),
59+
getMetadata().getPassword(),
60+
baseUrl);
61+
}
62+
63+
@Test
64+
void test() {
65+
final Map<String, String> options = new HashMap<>();
66+
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER);
67+
options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername());
68+
options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword());
69+
options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
70+
71+
final Catalog actualCatalog =
72+
FactoryUtil.createCatalog(
73+
TEST_CATALOG_NAME,
74+
options,
75+
null,
76+
Thread.currentThread().getContextClassLoader());
77+
78+
checkEquals(catalog, (JdbcCatalog) actualCatalog);
79+
80+
assertThat((JdbcCatalog) actualCatalog).isInstanceOf(CrateDBCatalog.class);
81+
}
82+
83+
private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) {
84+
assertThat(c2).isEqualTo(c1);
85+
}
86+
}

0 commit comments

Comments
 (0)