diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactory.java index 65fc1f051..b7bbe40cf 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactory.java @@ -64,6 +64,13 @@ default JdbcDialect createDialect(String compatibleMode) { "Not supported option 'compatible-mode' with value: " + compatibleMode); } + JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl); + JdbcCatalog createCatalog( ClassLoader classLoader, String catalogName, diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactoryLoader.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactoryLoader.java index ff5ddd11e..e7e280e4d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactoryLoader.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/JdbcFactoryLoader.java @@ -58,6 +58,15 @@ public static JdbcDialect loadDialect( return load(url, classLoader).createDialect(compatibleMode); } + public static JdbcCatalog loadCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + return loadCatalog(classLoader, catalogName, null, username, pwd, baseUrl); + } + public static JdbcCatalog loadCatalog( ClassLoader classLoader, String catalogName, diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java index 734245985..3d9ede190 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java @@ -73,6 +73,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.function.Function; import java.util.function.Predicate; import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.PASSWORD_KEY; @@ -96,6 +97,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog implements Jdb protected final ClassLoader userClassLoader; protected final String baseUrl; protected final String defaultUrl; + protected final Function urlFunction; protected final Properties connectionProperties; @Deprecated @@ -120,17 +122,16 @@ public AbstractJdbcCatalog( String defaultDatabase, String baseUrl, Properties connectionProperties) { - super(catalogName, defaultDatabase); + super(catalogName, validateJdbcUrl(baseUrl, defaultDatabase)); checkNotNull(userClassLoader); checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); - validateJdbcUrl(baseUrl); - this.userClassLoader = userClassLoader; + this.urlFunction = calculateUrlFunction(baseUrl); this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; - this.defaultUrl = getDatabaseUrl(defaultDatabase); this.connectionProperties = Preconditions.checkNotNull(connectionProperties); + this.defaultUrl = this.urlFunction.apply(defaultDatabase); checkArgument( !StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY))); checkArgument( @@ -138,10 +139,6 @@ public AbstractJdbcCatalog( connectionProperties.getProperty(PASSWORD_KEY))); } - protected String getDatabaseUrl(String databaseName) { - return baseUrl + databaseName; - } - @Override public void open() throws CatalogException { // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail @@ -274,7 +271,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath) String databaseName = tablePath.getDatabaseName(); try (Connection conn = - DriverManager.getConnection(getDatabaseUrl(databaseName), connectionProperties)) { + DriverManager.getConnection( + this.urlFunction.apply(databaseName), connectionProperties)) { DatabaseMetaData metaData = conn.getMetaData(); Optional primaryKey = getPrimaryKey( @@ -315,7 +313,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) protected Map getOptions(ObjectPath tablePath) { Map props = new HashMap<>(); props.put(CONNECTOR.key(), IDENTIFIER); - props.put(URL.key(), getDatabaseUrl(tablePath.getDatabaseName())); + props.put(URL.key(), this.urlFunction.apply(tablePath.getDatabaseName())); props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY)); props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY)); props.put(TABLE_NAME.key(), getSchemaTableName(tablePath)); @@ -576,14 +574,57 @@ protected String getSchemaTableName(ObjectPath tablePath) { throw new UnsupportedOperationException(); } + private Function calculateUrlFunction(String url) { + final String[] parts; + final int questionMarkIndex = url.indexOf('?'); + if (questionMarkIndex == -1) { + parts = url.split("/+", 3); + return dbName -> parts.length == 3 ? url.trim() : url.trim() + "/" + dbName; + } else { + String withoutParams = url.substring(0, questionMarkIndex); + String prefix = withoutParams.substring(0, withoutParams.lastIndexOf('/') + 1); + return dbName -> + dbName == null ? url : prefix + dbName + "?" + url.substring(questionMarkIndex); + } + } + /** * URL has to be without database, like "jdbc:dialect://localhost:1234/" or * "jdbc:dialect://localhost:1234" rather than "jdbc:dialect://localhost:1234/db". */ - protected static void validateJdbcUrl(String url) { - String[] parts = url.trim().split("\\/+"); - - checkArgument(parts.length == 2); + protected static String validateJdbcUrl(String url, String defaultDatabase) { + String trimmedUrl = url.trim(); + String processedUrl = + trimmedUrl.endsWith("/") + ? trimmedUrl.substring(0, trimmedUrl.length() - 1) + : trimmedUrl; + String[] parts = processedUrl.split("/+", 3); + String database; + int questionMark = trimmedUrl.indexOf('?'); + if (questionMark == -1) { + if (defaultDatabase == null) { + checkArgument(parts.length > 2); + database = parts[2]; + } else { + checkArgument( + parts.length == 2 + || (parts.length == 3 && defaultDatabase.equals(parts[2]))); + database = defaultDatabase; + } + } else { + checkArgument(parts.length > 2); + questionMark = parts[2].indexOf('?'); + if (defaultDatabase == null) { + database = parts[2].substring(0, questionMark); + } else { + checkArgument( + questionMark > -1 + && Objects.equals( + parts[2].substring(0, questionMark), defaultDatabase)); + database = defaultDatabase; + } + } + return database; } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java index ca27d36b9..57a8838ed 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactory.java @@ -51,7 +51,6 @@ public String factoryIdentifier() { @Override public Set> requiredOptions() { final Set> options = new HashSet<>(); - options.add(DEFAULT_DATABASE); options.add(USERNAME); options.add(PASSWORD); options.add(BASE_URL); @@ -61,6 +60,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); options.add(PROPERTY_VERSION); options.add(COMPATIBLE_MODE); return options; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java index 02c607b4b..43cbaf88a 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java @@ -34,7 +34,7 @@ public class JdbcCatalogFactoryOptions { public static final ConfigOption DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) .stringType() - .noDefaultValue(); + .defaultValue(null); public static final ConfigOption USERNAME = JdbcConnectorOptions.USERNAME; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/derby/database/DerbyFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/derby/database/DerbyFactory.java index db265d034..1f0e09f5d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/derby/database/DerbyFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/derby/database/DerbyFactory.java @@ -37,6 +37,16 @@ public JdbcDialect createDialect() { return new DerbyDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + throw new UnsupportedOperationException("Catalog for Derby is not supported yet."); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java index 371140d36..cec7f1ceb 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java @@ -27,8 +27,10 @@ class AbstractJdbcCatalogTest { @Test void testJdbcUrl() { - AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/"); - AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234"); + AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", "db"); + AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/db", null); + AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/", "db"); + AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234", "db"); } @Test @@ -36,7 +38,22 @@ void testInvalidJdbcUrl() { assertThatThrownBy( () -> AbstractJdbcCatalog.validateJdbcUrl( - "jdbc:dialect://localhost:1234/db")) + "jdbc:dialect://localhost:1234", null)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + AbstractJdbcCatalog.validateJdbcUrl( + "jdbc:dialect://localhost:1234/", null)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + AbstractJdbcCatalog.validateJdbcUrl( + "jdbc:dialect://localhost:1234/db", "")) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + AbstractJdbcCatalog.validateJdbcUrl( + "jdbc:dialect://localhost:1234/db", "not_db")) .isInstanceOf(IllegalArgumentException.class); } } diff --git a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactory.java b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactory.java index ee52ab1bf..4725e0ff4 100644 --- a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactory.java +++ b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactory.java @@ -38,6 +38,16 @@ public JdbcDialect createDialect() { return new CrateDBDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + return new CrateDBCatalog(classLoader, catalogName, null, username, pwd, baseUrl); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java index 288ac0e7d..d0e727b9a 100644 --- a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java +++ b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java @@ -135,7 +135,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { String searchPath = extractColumnValuesBySQL( - getDatabaseUrl(DEFAULT_DATABASE), "show search_path", 1, null) + urlFunction.apply(DEFAULT_DATABASE), "show search_path", 1, null) .get(0); String[] schemas = searchPath.split("\\s*,\\s*"); diff --git a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryDefaultDatabaseNullTest.java b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryDefaultDatabaseNullTest.java new file mode 100644 index 000000000..6ffae5a7e --- /dev/null +++ b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryDefaultDatabaseNullTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.cratedb.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.cratedb.CrateDBTestBase; +import org.apache.flink.connector.jdbc.cratedb.database.catalog.CrateDBCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for CrateDB database catalog factory without default database option. + * + *

This test verifies catalog creation when the default database option is not provided, ensuring + * that {@link CrateDBCatalog} is properly instantiated using only the base URL configuration. + */ +public class CrateDBFactoryDefaultDatabaseNullTest implements CrateDBTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mycratedb"; + + @BeforeEach + void setup() { + // jdbc:crate://localhost:49290/crate + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(CrateDBCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryDefaultDatabaseSameTest.java b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryDefaultDatabaseSameTest.java new file mode 100644 index 000000000..787372269 --- /dev/null +++ b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryDefaultDatabaseSameTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.cratedb.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.cratedb.CrateDBTestBase; +import org.apache.flink.connector.jdbc.cratedb.database.catalog.CrateDBCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for CrateDB database catalog factory with matching database configuration. + * + *

Verifies that {@link CrateDBCatalog} is correctly instantiated when the default database + * option ("test") matches the database specified in the base URL + * (jdbc:crate://localhost:49290/crate). + */ +public class CrateDBFactoryDefaultDatabaseSameTest implements CrateDBTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mycratedb"; + + @BeforeEach + void setup() { + // jdbc:crate://localhost:49290/crate + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + CrateDBCatalog.DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put( + JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), CrateDBCatalog.DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(CrateDBCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryTest.java b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryTest.java new file mode 100644 index 000000000..08c256a78 --- /dev/null +++ b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/database/CrateDBFactoryTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.cratedb.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.cratedb.CrateDBTestBase; +import org.apache.flink.connector.jdbc.cratedb.database.catalog.CrateDBCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for CrateDB database catalog factory. + * + *

This test follows the same pattern as other database factory tests, verifying that {@link + * CrateDBCatalog} is correctly instantiated and configured. + */ +public class CrateDBFactoryTest implements CrateDBTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mycratedb"; + + @BeforeEach + void setup() { + // jdbc:crate://localhost:49290/crate + String jdbcUrl = getMetadata().getJdbcUrl(); + // jdbc:crate://localhost:49290/ + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + CrateDBCatalog.DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put( + JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), CrateDBCatalog.DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(CrateDBCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/Db2Factory.java b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/Db2Factory.java index 61644709c..e7570a214 100644 --- a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/Db2Factory.java +++ b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/Db2Factory.java @@ -37,6 +37,29 @@ public JdbcDialect createDialect() { return new Db2Dialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + throw new UnsupportedOperationException("Catalog for DB2 is not supported yet."); + } + + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + String compatibleMode) { + return JdbcFactory.super.createCatalog( + classLoader, catalogName, defaultDatabase, username, pwd, baseUrl, compatibleMode); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactory.java b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactory.java index 68e7973dd..8d24a9aa7 100644 --- a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactory.java +++ b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactory.java @@ -38,6 +38,16 @@ public JdbcDialect createDialect() { return new MySqlDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + return new MySqlCatalog(classLoader, catalogName, null, username, pwd, baseUrl); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java index 27bdf93ec..d693dbe24 100644 --- a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java +++ b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java @@ -118,7 +118,7 @@ public List listTables(String databaseName) } return extractColumnValuesBySQL( - getDatabaseUrl(databaseName), + urlFunction.apply(databaseName), "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", 1, null, diff --git a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryDefaultDatabaseNullTest.java b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryDefaultDatabaseNullTest.java new file mode 100644 index 000000000..a651fb997 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryDefaultDatabaseNullTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.mysql.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.mysql.MySqlTestBase; +import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for MySQL database catalog factory without default database option. + * + *

This test verifies catalog creation when the default database option is not provided, ensuring + * that {@link MySqlCatalog} is properly instantiated using only the base URL configuration. + */ +public class MySqlFactoryDefaultDatabaseNullTest implements MySqlTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mysql_catalog"; + + @BeforeEach + void setup() { + // jdbc:mysql://localhost:56336/test + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(MySqlCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryDefaultDatabaseSameTest.java b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryDefaultDatabaseSameTest.java new file mode 100644 index 000000000..b54a43365 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryDefaultDatabaseSameTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.mysql.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.mysql.MySqlTestBase; +import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for MySQL database catalog factory with matching database configuration. + * + *

Verifies that {@link MySqlCatalog} is correctly instantiated when the default database option + * ("test") matches the database specified in the base URL (jdbc:mysql://localhost:56336/test). + */ +public class MySqlFactoryDefaultDatabaseSameTest implements MySqlTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mysql_catalog"; + protected static final String DEFAULT_DATABASE = "test"; + + @BeforeEach + void setup() { + // jdbc:mysql://localhost:56336/test + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(MySqlCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryTest.java b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryTest.java new file mode 100644 index 000000000..b1527ece5 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/database/MySqlFactoryTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.mysql.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.mysql.MySqlTestBase; +import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for MySQL database catalog factory. + * + *

This test follows the same pattern as other database factory tests, verifying that {@link + * MySqlCatalog} is correctly instantiated and configured. + */ +public class MySqlFactoryTest implements MySqlTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mysql_catalog"; + protected static final String DEFAULT_DATABASE = "test"; + + @BeforeEach + void setup() { + // jdbc:mysql://localhost:56336/test + String jdbcUrl = getMetadata().getJdbcUrl(); + // jdbc:mysql://localhost:56336/ + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(MySqlCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java index d5ab48389..3236aadc0 100644 --- a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java +++ b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java @@ -45,6 +45,16 @@ public JdbcDialect createDialect(String compatibleMode) { return new OceanBaseDialect(OceanBaseCompatibleMode.parse(compatibleMode)); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + return createCatalog(classLoader, catalogName, null, username, pwd, baseUrl); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryDefaultDatabaseNullTest.java b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryDefaultDatabaseNullTest.java new file mode 100644 index 000000000..b73d963af --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryDefaultDatabaseNullTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oceanbase.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase; +import org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for OceanBase database catalog factory without default database option. + * + *

This test verifies catalog creation when the default database option is not provided, ensuring + * that {@link OceanBaseCatalog} is properly instantiated using only the base URL configuration. + */ +public class OceanBaseFactoryDefaultDatabaseNullTest implements OceanBaseMysqlTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "oceanbase_mysql_catalog"; + + @BeforeEach + void setup() { + // jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(OceanBaseCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryDefaultDatabaseSameTest.java b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryDefaultDatabaseSameTest.java new file mode 100644 index 000000000..b870afb7f --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryDefaultDatabaseSameTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oceanbase.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase; +import org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for OceanBase database catalog factory with matching database configuration. + * + *

Verifies that {@link OceanBaseCatalog} is correctly instantiated when the default database + * option ("test") matches the database specified in the base URL + * (jdbc:oceanbase://localhost:port/test). + */ +public class OceanBaseFactoryDefaultDatabaseSameTest implements OceanBaseMysqlTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "oceanbase_mysql_catalog"; + protected static final String DEFAULT_DATABASE = "test"; + + @BeforeEach + void setup() { + // jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(OceanBaseCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryTest.java b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryTest.java new file mode 100644 index 000000000..c959cff93 --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactoryTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oceanbase.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase; +import org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for OceanBase database catalog factory. + * + *

This test follows the same pattern as other database factory tests, verifying that {@link + * OceanBaseCatalog} is correctly instantiated and configured. + */ +public class OceanBaseFactoryTest implements OceanBaseMysqlTestBase { + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "oceanbase_mysql_catalog"; + protected static final String DEFAULT_DATABASE = "test"; + + @BeforeEach + void setup() { + // jdbc:oceanbase://localhost:56336/test?serverTimezone=+09:00&useSSL=false + String jdbcUrl = getMetadata().getJdbcUrl(); + // jdbc:mysql://localhost:56336/ + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(OceanBaseCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java index c0bc6b76c..7989e5c08 100644 --- a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java +++ b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/database/catalog/OceanBaseCatalogITCaseBase.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.stream.Collectors; -import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; import static org.assertj.core.api.Assertions.assertThat; @@ -90,11 +89,11 @@ void setup() { catalogName, compatibleMode, defaultDatabase, + getMetadata().getUsername(), + getMetadata().getPassword(), getMetadata() .getJdbcUrl() - .substring(0, getMetadata().getJdbcUrl().lastIndexOf("/")), - getBriefAuthProperties( - getMetadata().getUsername(), getMetadata().getPassword())); + .substring(0, getMetadata().getJdbcUrl().lastIndexOf("/"))); tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); diff --git a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/OracleFactory.java b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/OracleFactory.java index 09418feab..d700b7c5d 100644 --- a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/OracleFactory.java +++ b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/OracleFactory.java @@ -37,6 +37,16 @@ public JdbcDialect createDialect() { return new OracleDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + throw new UnsupportedOperationException("Catalog for Oracle is not supported yet."); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactory.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactory.java index ddf6172ab..93c8875e1 100644 --- a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactory.java +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactory.java @@ -38,6 +38,16 @@ public JdbcDialect createDialect() { return new PostgresDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + return new PostgresCatalog(classLoader, catalogName, null, username, pwd, baseUrl); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java index def48d64f..4aff59071 100644 --- a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java @@ -187,7 +187,7 @@ public List listTables(String databaseName) throw new DatabaseNotExistException(getName(), databaseName); } - final String url = getDatabaseUrl(databaseName); + final String url = urlFunction.apply(databaseName); try (Connection conn = DriverManager.getConnection(url, connectionProperties)) { // get all schemas List schemas; diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryDefaultDatabaseNullTest.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryDefaultDatabaseNullTest.java new file mode 100644 index 000000000..6fc61d87e --- /dev/null +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryDefaultDatabaseNullTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.postgres.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.postgres.PostgresTestBase; +import org.apache.flink.connector.jdbc.postgres.database.catalog.PostgresCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for PostgreSQL database catalog factory without default database option. + * + *

This test verifies catalog creation when the default database option is not provided, ensuring + * that {@link PostgresCatalog} is properly instantiated using only the base URL configuration. + */ +class PostgresFactoryDefaultDatabaseNullTest implements PostgresTestBase { + + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mypg"; + + @BeforeEach + void setup() { + // jdbc:postgresql://localhost:53036/test?loggerLevel=OFF + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(PostgresCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryDefaultDatabaseSameTest.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryDefaultDatabaseSameTest.java new file mode 100644 index 000000000..691aedea9 --- /dev/null +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryDefaultDatabaseSameTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.postgres.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.postgres.PostgresTestBase; +import org.apache.flink.connector.jdbc.postgres.database.catalog.PostgresCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test class for PostgreSQL database catalog factory with matching database configuration. + * + *

Verifies that {@link PostgresCatalog} is correctly instantiated when the default database + * option ("test") matches the database specified in the base URL + * (jdbc:postgresql://localhost:53036/test?loggerLevel=OFF). + */ +class PostgresFactoryDefaultDatabaseSameTest implements PostgresTestBase { + + protected static String baseUrl; + protected static JdbcCatalog catalog; + protected static final String TEST_CATALOG_NAME = "mypg"; + protected static final String DEFAULT_DATABASE = "test"; + + @BeforeEach + void setup() { + // jdbc:postgresql://localhost:53036/test?loggerLevel=OFF + baseUrl = getMetadata().getJdbcUrl(); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(PostgresCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java index 1b3d23300..e295875de 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/PostgresFactoryTest.java @@ -41,7 +41,6 @@ class PostgresFactoryTest implements PostgresTestBase { protected static String baseUrl; protected static JdbcCatalog catalog; - protected static final String TEST_CATALOG_NAME = "mypg"; @BeforeEach diff --git a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/SqlServerFactory.java b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/SqlServerFactory.java index 28ffc82fa..b848511ad 100644 --- a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/SqlServerFactory.java +++ b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/SqlServerFactory.java @@ -37,6 +37,16 @@ public JdbcDialect createDialect() { return new SqlServerDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + throw new UnsupportedOperationException("Catalog for SqlServer is not supported yet."); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader, diff --git a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/TrinoFactory.java b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/TrinoFactory.java index b54d8743d..8c7bbba16 100644 --- a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/TrinoFactory.java +++ b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/TrinoFactory.java @@ -37,6 +37,16 @@ public JdbcDialect createDialect() { return new TrinoDialect(); } + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String username, + String pwd, + String baseUrl) { + throw new UnsupportedOperationException("Catalog for Trino is not supported yet."); + } + @Override public JdbcCatalog createCatalog( ClassLoader classLoader,