Skip to content

Conversation

@och5351
Copy link

@och5351 och5351 commented Dec 4, 2025


(Note: I am currently waiting for my Jira account registration to be approved, so I am unable to create a ticket for this issue at the moment. I will update the title with the Jira ticket number as soon as I gain access. If someone else would like to create it, that would also be great.)


1. Motivation

Currently, when creating a JDBC Catalog in Flink SQL, there is no way to pass arbitrary connection properties to the JDBC driver. This is a significant limitation compared to JDBC Dynamic Tables, which support a generic properties option.

This missing feature can lead to connection failures for certain databases that require specific driver properties. A common example is connecting to a MySQL or MariaDB server with a non-UTC timezone (e.g., 'KST'), which results in the following error:

com.mysql.cj.exceptions.InvalidConnectionAttributeException: The server time zone value 'KST' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specific time zone value if you want to utilize time zone support.

To resolve this, the user needs to be able to pass properties like serverTimezone=UTC. Similarly, other use cases, such as passing stringtype=unspecified for PostgreSQL, are not possible with the current JDBC Catalog implementation.

Previous attempts to address this (e.g., PR #74, PR #83) were initiated but not completed. This PR provides a complete and robust solution to this long-standing issue.

2. Solution

This PR introduces a new configuration option, database-options, to the JdbcCatalogFactory. This option allows users to specify a string of key-value pairs that will be appended to the JDBC connection URL.

  • For most databases, the options are appended as a query string, starting with ? for the first parameter and using & for subsequent ones.
  • For DB2, which uses a semicolon (;) as a separator, the implementation correctly handles this specific syntax.
    This approach provides a flexible and universal way to configure any required JDBC driver property without adding database-specific options.

3. Implementation Details

The changes were implemented as follows:

  1. JdbcCatalogFactory.java:

    A new ConfigOption named database-options was added.
    This option is registered in optionalOptions() to make it non-mandatory.

  2. AbstractJdbcCatalog.java:

    The getDatabaseUrl() method was modified to check for the presence of database-options.
    If the options exist, they are appended to the base JDBC URL using the correct separator (? or ; based on the database dialect).

  3. Factories and Loaders:

    The database-options value is now passed through the factory chain: JdbcCatalogFactory -> JdbcFactoryLoader -> database-specific factories (MySqlFactory, PostgresFactory, etc.).
    The method signatures in JdbcFactory.java and JdbcFactoryLoader.java were overloaded to accept the new dbOptions parameter.

  4. Database-Specific Implementations:

    All existing database-specific catalog implementations (MySQL, PostgreSQL, Derby, OceanBase, etc.) and their corresponding factories were updated to accept and pass the dbOptions parameter to the AbstractJdbcCatalog constructor.

  5. Usage Example

    With this change, a user can now create a JDBC catalog for MySQL and specify the required timezone property as follows:

CREATE CATALOG my_mariadb_catalog WITH (
  'type' = 'jdbc',
  'base-url' = 'jdbc:mysql://localhost:3306',
  'username' = 'user',
  'password' = 'pass',
  'default-database' = 'my_db',
  'database-options' = 'serverTimezone=UTC&tinyInt1isBit=false'
);
  1. Verification

    The changes have been tested locally by creating a JDBC catalog for a MariaDB database that requires the serverTimezone property.

mysql

image

oceanbase

image image

postgresql

image
  1. Related Issues/PRs

    JIRA: FLINK-38616
    Stale PRs: [FLINK-33069]Mysql and Postgres catalog support url extra parameters #74, [FLINK-33800][JDBC/Connector] Allow passing parameters to database via jdbc url #83

@och5351 och5351 changed the title [Improve] Support passing arbitrary database options to JDBC Catalog [feature] Support passing arbitrary database options to JDBC Catalog Dec 4, 2025
@och5351 och5351 changed the title [feature] Support passing arbitrary database options to JDBC Catalog [Connector/JDBC] Support passing arbitrary database options to JDBC Catalog Dec 4, 2025

public static final String USER_KEY = "user";
public static final String PASSWORD_KEY = "password";
public static final String DATABASE_OPTIONS = "database-options";
Copy link
Contributor

@RocMarshal RocMarshal Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @och5351 for sorting out the previous related works and initiating the PR.

Hi, Thank you @eskabetxe for your previous comment[1].

I'd personally prefer passing parameters based on the baseUrl[2], for a few simple reasons:

  • Using the baseUrl to pass parameters can also achieve the current functionality.
  • Using the baseUrl to pass parameters avoids introducing new additional parameters, preserving the original style of the URL as much as possible. From a user’s perspective, there is no need to write an extra line of parameters.

If using URL-based parameter passing across various database products fails to meet the current needs, then introducing db-options would be a good alternative.

CC @snuyanzin

[1] #83 (comment)
[2] #83

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @RocMarshal !

Thank you for your helpful review.

I agree with your point, but the current version does not support applying DB parameters in the catalog configuration (although it’s possible in dynamic tables).

Here is the current code snippet:

public AbstractJdbcCatalog(
        ClassLoader userClassLoader,
        String catalogName,
        String defaultDatabase,
        String baseUrl,
        Properties connectionProperties) {
    super(catalogName, defaultDatabase);

    checkNotNull(userClassLoader);
    checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));

    validateJdbcUrl(baseUrl);

    this.userClassLoader = userClassLoader;
    this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
    this.defaultUrl = getDatabaseUrl(defaultDatabase);
    this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
    checkArgument(
            !StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
    checkArgument(
            !StringUtils.isNullOrWhitespaceOnly(
                    connectionProperties.getProperty(PASSWORD_KEY)));
}

protected String getDatabaseUrl(String databaseName) {
    return baseUrl + databaseName;
}

If the base-url is given like this:

CREATE CATALOG my_mariadb_catalog WITH (
  'type' = 'jdbc',
  'base-url' = 'jdbc:mysql://localhost:3306/my_db?serverTimezone=UTC&tinyInt1isBit=false',
  'username' = 'user',
  'password' = 'pass',
  'default-database' = 'my_db',
  -- 'database-options' = 'serverTimezone=UTC&tinyInt1isBit=false'
);

then the Flink JDBC connector treats the URL as:

jdbc:mysql://localhost:3306/my_db?serverTimezone=UTC&tinyInt1isBit=false/my_db

Because of this, I initially proposed the use of database-options.
However, I think it would also be good to make better use of base-url directly.

Therefore, I would like to suggest the following possible improvements:

Option 1:

  1. Change default-database in JdbcCatalogFactory.java to be an optional parameter.
  2. Remove the logic in the AbstractJdbcCatalog constructor that appends / to base-url.
  3. Modify the getDatabaseUrl() method in AbstractJdbcCatalog to work as follows:
  • Check if defaultDatabase is provided.
  • Check if the baseUrl contains ? indicating database options.
  • Extract the database name from baseUrl (e.g., using split("?")[0] and checking if it ends with /).
  • Return the appropriate baseUrl.

Option 2:

  1. Remove the default-database option from JdbcCatalogFactory.java altogether.
  2. Accept the entire connection URL (including database and options) in the base-url parameter.

Thank you again for your valuable review and continued support.
I would appreciate your thoughts on this proposal and am happy to discuss further.

Best regards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 sounds better to me .

If it could reference the [1] content during the implementation, that would be even better.
Alternatively, other elegant implementation approaches are also welcome.

[1] #83

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I have updated the code following option 1.
After completing the changes, I will kindly ask you to review the PR again.

Thank you!

@och5351 och5351 requested a review from RocMarshal December 9, 2025 08:49
@och5351 och5351 changed the title [Connector/JDBC] Support passing arbitrary database options to JDBC Catalog [hotfix][Connector/JDBC] Support passing arbitrary database options to JDBC Catalog Dec 9, 2025
@och5351 och5351 changed the title [hotfix][Connector/JDBC] Support passing arbitrary database options to JDBC Catalog [hotfix] Support passing arbitrary database options to JDBC Catalog Dec 11, 2025
@och5351
Copy link
Author

och5351 commented Dec 11, 2025

Hi, @RocMarshal !

I have updated the code. Here are the changelogs:

  1. Made the default-database option optional.
  2. Added inspection base URL logic referring to PR [FLINK-33800][JDBC/Connector] Allow passing parameters to database via jdbc url #83. (Thank you, @snuyanzin)
  3. Added more test cases.

Could you please take another look when you have time?

Well.. I think not necessary PostgreSQL, MySQL, Oceanbase, CrateDB Testcases because org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalogTest.java

Copy link
Contributor

@RocMarshal RocMarshal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @och5351 for the update.

I left a few of comments , pls take a look.

@@ -0,0 +1,62 @@
package org.apache.flink.connector.jdbc.cratedb.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing License header.

@@ -0,0 +1,65 @@
package org.apache.flink.connector.jdbc.cratedb.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -0,0 +1,68 @@
package org.apache.flink.connector.jdbc.cratedb.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -0,0 +1,62 @@
package org.apache.flink.connector.jdbc.mysql.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -0,0 +1,65 @@
package org.apache.flink.connector.jdbc.mysql.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -0,0 +1,68 @@
package org.apache.flink.connector.jdbc.mysql.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -0,0 +1,62 @@
package org.apache.flink.connector.jdbc.oceanbase.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -0,0 +1,66 @@
package org.apache.flink.connector.jdbc.oceanbase.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -0,0 +1,68 @@
package org.apache.flink.connector.jdbc.oceanbase.database;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


import static org.assertj.core.api.Assertions.assertThat;

public class OceanBaseFactoryDefaultDatabaseNullTest implements OceanBaseMysqlTestBase {
Copy link
Contributor

@RocMarshal RocMarshal Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, It would be better add the comments for all of the new introduced test classes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal

I learn the Apache project commit manners every time, thank you! I have made the changes as you mentioned, so please check once more when you have time :)

@och5351 och5351 requested a review from RocMarshal December 12, 2025 05:42
"Not supported option 'compatible-mode' with value: " + compatibleMode);
}

JdbcCatalog createCatalog(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @och5351 for the hard work.

May I know why we need introduce the method due to items form my side:

  • A: If this method is not introduced, it seems that the target functionality could still be achieved.
  • Taking a step(A)back, even if this method must be introduced to achieve the intended objective, I have not found any line of code that explicitly calls this method.

Pls correct me if I'm wrong. thx

Copy link
Author

@och5351 och5351 Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @RocMarshal,

I think it is necessary due to the change in the optional option default-database.

For example, if you create a catalog like this:

CREATE CATALOG my_mariadb_catalog WITH (
  'type' = 'jdbc',
  'base-url' = 'jdbc:mysql://localhost:3306/mydb',
  'username' = 'user',
  'password' = 'pass'
  -- 'default-database' = 'my_db',
);

I think this constructor is necessary, and it was created to handle cases where the default-database option is not provided in factories like MySQL, Postgres, etc.

May I ask why you think this part might not be necessary?

Copy link
Contributor

@RocMarshal RocMarshal Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, The callings for

    JdbcCatalog createCatalog(
            ClassLoader classLoader,
            String catalogName,
            String username,
            String pwd,
            String baseUrl);

are located the setup of test classes only.

Methods about #createCatalog the main callings in SPI are shown as following: instead of the new introduced:


    JdbcCatalog createCatalog(
            ClassLoader classLoader,
            String catalogName,
            String defaultDatabase,
            String username,
            String pwd,
            String baseUrl);

    default JdbcCatalog createCatalog(
            ClassLoader classLoader,
            String catalogName,
            String defaultDatabase,
            String username,
            String pwd,
            String baseUrl,
            String compatibleMode) {
        if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
            return createCatalog(classLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
        }
        throw new UnsupportedOperationException(
                "Not supported option 'compatible-mode' with value: " + compatibleMode);
    }

What I wanna to say is,
if all the above statements are valid, you could simply call the existing method directly in the test case, and set default-database to null. This way, you don’t need to introduce a new method, right?

Hope this expressed~

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @RocMarshal

Oh, I completely misunderstood.
You were referring to createCatalog in JdbcFactory, but I was actually thinking about loadCatalog. I was focusing on the optionalOptions in JdbcCatalogFactory and thought a new createCatalog method was needed, but it turns out it’s not necessary.
Also, I found the createCatalog method in FactoryUtil.

Now I understand your review and really appreciate your patience.

If you have more time, could you please take a look at the changes again?

@och5351 och5351 requested a review from RocMarshal December 13, 2025 04:21
"Not supported option 'compatible-mode' with value: " + compatibleMode);
}

JdbcCatalog createCatalog(
Copy link
Contributor

@RocMarshal RocMarshal Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, The callings for

    JdbcCatalog createCatalog(
            ClassLoader classLoader,
            String catalogName,
            String username,
            String pwd,
            String baseUrl);

are located the setup of test classes only.

Methods about #createCatalog the main callings in SPI are shown as following: instead of the new introduced:


    JdbcCatalog createCatalog(
            ClassLoader classLoader,
            String catalogName,
            String defaultDatabase,
            String username,
            String pwd,
            String baseUrl);

    default JdbcCatalog createCatalog(
            ClassLoader classLoader,
            String catalogName,
            String defaultDatabase,
            String username,
            String pwd,
            String baseUrl,
            String compatibleMode) {
        if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
            return createCatalog(classLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
        }
        throw new UnsupportedOperationException(
                "Not supported option 'compatible-mode' with value: " + compatibleMode);
    }

What I wanna to say is,
if all the above statements are valid, you could simply call the existing method directly in the test case, and set default-database to null. This way, you don’t need to introduce a new method, right?

Hope this expressed~

@och5351 och5351 force-pushed the feature/jdbc-catalog-enabled-db-option-param branch 2 times, most recently from 1dfcc7a to 6d06e23 Compare December 16, 2025 00:35
@och5351 och5351 force-pushed the feature/jdbc-catalog-enabled-db-option-param branch from 6d06e23 to c677143 Compare December 16, 2025 00:50
@och5351 och5351 requested a review from RocMarshal December 16, 2025 01:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants