-
Notifications
You must be signed in to change notification settings - Fork 205
[hotfix] Support passing arbitrary database options to JDBC Catalog #183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[hotfix] Support passing arbitrary database options to JDBC Catalog #183
Conversation
|
|
||
| public static final String USER_KEY = "user"; | ||
| public static final String PASSWORD_KEY = "password"; | ||
| public static final String DATABASE_OPTIONS = "database-options"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @och5351 for sorting out the previous related works and initiating the PR.
Hi, Thank you @eskabetxe for your previous comment[1].
I'd personally prefer passing parameters based on the baseUrl[2], for a few simple reasons:
- Using the baseUrl to pass parameters can also achieve the current functionality.
- Using the baseUrl to pass parameters avoids introducing new additional parameters, preserving the original style of the URL as much as possible. From a user’s perspective, there is no need to write an extra line of parameters.
If using URL-based parameter passing across various database products fails to meet the current needs, then introducing db-options would be a good alternative.
CC @snuyanzin
[1] #83 (comment)
[2] #83
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @RocMarshal !
Thank you for your helpful review.
I agree with your point, but the current version does not support applying DB parameters in the catalog configuration (although it’s possible in dynamic tables).
Here is the current code snippet:
public AbstractJdbcCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String baseUrl,
Properties connectionProperties) {
super(catalogName, defaultDatabase);
checkNotNull(userClassLoader);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
validateJdbcUrl(baseUrl);
this.userClassLoader = userClassLoader;
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = getDatabaseUrl(defaultDatabase);
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(
connectionProperties.getProperty(PASSWORD_KEY)));
}
protected String getDatabaseUrl(String databaseName) {
return baseUrl + databaseName;
}If the base-url is given like this:
CREATE CATALOG my_mariadb_catalog WITH (
'type' = 'jdbc',
'base-url' = 'jdbc:mysql://localhost:3306/my_db?serverTimezone=UTC&tinyInt1isBit=false',
'username' = 'user',
'password' = 'pass',
'default-database' = 'my_db',
-- 'database-options' = 'serverTimezone=UTC&tinyInt1isBit=false'
);then the Flink JDBC connector treats the URL as:
jdbc:mysql://localhost:3306/my_db?serverTimezone=UTC&tinyInt1isBit=false/my_db
Because of this, I initially proposed the use of database-options.
However, I think it would also be good to make better use of base-url directly.
Therefore, I would like to suggest the following possible improvements:
Option 1:
- Change
default-databaseinJdbcCatalogFactory.javato be an optional parameter. - Remove the logic in the AbstractJdbcCatalog constructor that appends
/to base-url. - Modify the getDatabaseUrl() method in AbstractJdbcCatalog to work as follows:
- Check if defaultDatabase is provided.
- Check if the baseUrl contains ? indicating database options.
- Extract the database name from baseUrl (e.g., using split("?")[0] and checking if it ends with /).
- Return the appropriate baseUrl.
Option 2:
- Remove the default-database option from JdbcCatalogFactory.java altogether.
- Accept the entire connection URL (including database and options) in the base-url parameter.
Thank you again for your valuable review and continued support.
I would appreciate your thoughts on this proposal and am happy to discuss further.
Best regards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option 1 sounds better to me .
If it could reference the [1] content during the implementation, that would be even better.
Alternatively, other elegant implementation approaches are also welcome.
[1] #83
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I have updated the code following option 1.
After completing the changes, I will kindly ask you to review the PR again.
Thank you!
|
Hi, @RocMarshal ! I have updated the code. Here are the changelogs:
Could you please take another look when you have time?
|
RocMarshal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @och5351 for the update.
I left a few of comments , pls take a look.
| @@ -0,0 +1,62 @@ | |||
| package org.apache.flink.connector.jdbc.cratedb.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing License header.
| @@ -0,0 +1,65 @@ | |||
| package org.apache.flink.connector.jdbc.cratedb.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| @@ -0,0 +1,68 @@ | |||
| package org.apache.flink.connector.jdbc.cratedb.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| @@ -0,0 +1,62 @@ | |||
| package org.apache.flink.connector.jdbc.mysql.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| @@ -0,0 +1,65 @@ | |||
| package org.apache.flink.connector.jdbc.mysql.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| @@ -0,0 +1,68 @@ | |||
| package org.apache.flink.connector.jdbc.mysql.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| @@ -0,0 +1,62 @@ | |||
| package org.apache.flink.connector.jdbc.oceanbase.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| @@ -0,0 +1,66 @@ | |||
| package org.apache.flink.connector.jdbc.oceanbase.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| @@ -0,0 +1,68 @@ | |||
| package org.apache.flink.connector.jdbc.oceanbase.database; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| public class OceanBaseFactoryDefaultDatabaseNullTest implements OceanBaseMysqlTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, It would be better add the comments for all of the new introduced test classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I learn the Apache project commit manners every time, thank you! I have made the changes as you mentioned, so please check once more when you have time :)
| "Not supported option 'compatible-mode' with value: " + compatibleMode); | ||
| } | ||
|
|
||
| JdbcCatalog createCatalog( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @och5351 for the hard work.
May I know why we need introduce the method due to items form my side:
- A: If this method is not introduced, it seems that the target functionality could still be achieved.
- Taking a step(A)back, even if this method must be introduced to achieve the intended objective, I have not found any line of code that explicitly calls this method.
Pls correct me if I'm wrong. thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @RocMarshal,
I think it is necessary due to the change in the optional option default-database.
For example, if you create a catalog like this:
CREATE CATALOG my_mariadb_catalog WITH (
'type' = 'jdbc',
'base-url' = 'jdbc:mysql://localhost:3306/mydb',
'username' = 'user',
'password' = 'pass'
-- 'default-database' = 'my_db',
);I think this constructor is necessary, and it was created to handle cases where the default-database option is not provided in factories like MySQL, Postgres, etc.
May I ask why you think this part might not be necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, The callings for
JdbcCatalog createCatalog(
ClassLoader classLoader,
String catalogName,
String username,
String pwd,
String baseUrl);
are located the setup of test classes only.
Methods about #createCatalog the main callings in SPI are shown as following: instead of the new introduced:
JdbcCatalog createCatalog(
ClassLoader classLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl);
default JdbcCatalog createCatalog(
ClassLoader classLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
String compatibleMode) {
if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
return createCatalog(classLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
}
throw new UnsupportedOperationException(
"Not supported option 'compatible-mode' with value: " + compatibleMode);
}
What I wanna to say is,
if all the above statements are valid, you could simply call the existing method directly in the test case, and set default-database to null. This way, you don’t need to introduce a new method, right?
Hope this expressed~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @RocMarshal
Oh, I completely misunderstood.
You were referring to createCatalog in JdbcFactory, but I was actually thinking about loadCatalog. I was focusing on the optionalOptions in JdbcCatalogFactory and thought a new createCatalog method was needed, but it turns out it’s not necessary.
Also, I found the createCatalog method in FactoryUtil.
Now I understand your review and really appreciate your patience.
If you have more time, could you please take a look at the changes again?
| "Not supported option 'compatible-mode' with value: " + compatibleMode); | ||
| } | ||
|
|
||
| JdbcCatalog createCatalog( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, The callings for
JdbcCatalog createCatalog(
ClassLoader classLoader,
String catalogName,
String username,
String pwd,
String baseUrl);
are located the setup of test classes only.
Methods about #createCatalog the main callings in SPI are shown as following: instead of the new introduced:
JdbcCatalog createCatalog(
ClassLoader classLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl);
default JdbcCatalog createCatalog(
ClassLoader classLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
String compatibleMode) {
if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
return createCatalog(classLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
}
throw new UnsupportedOperationException(
"Not supported option 'compatible-mode' with value: " + compatibleMode);
}
What I wanna to say is,
if all the above statements are valid, you could simply call the existing method directly in the test case, and set default-database to null. This way, you don’t need to introduce a new method, right?
Hope this expressed~
1dfcc7a to
6d06e23
Compare
6d06e23 to
c677143
Compare
1. Motivation
Currently, when creating a
JDBC Catalogin Flink SQL, there is no way to pass arbitrary connectionpropertiesto the JDBC driver. This is a significant limitation compared toJDBC Dynamic Tables, which support a generic properties option.This missing feature can lead to connection failures for certain databases that require specific driver properties. A common example is connecting to a MySQL or MariaDB server with a non-UTC timezone (e.g., 'KST'), which results in the following error:
To resolve this, the user needs to be able to pass properties like
serverTimezone=UTC. Similarly, other use cases, such as passingstringtype=unspecifiedfor PostgreSQL, are not possible with the current JDBC Catalog implementation.Previous attempts to address this (e.g., PR #74, PR #83) were initiated but not completed. This PR provides a complete and robust solution to this long-standing issue.
2. Solution
This PR introduces a new configuration option,
database-options, to theJdbcCatalogFactory. This option allows users to specify a string of key-value pairs that will be appended to the JDBC connection URL.This approach provides a flexible and universal way to configure any required JDBC driver property without adding database-specific options.
3. Implementation Details
The changes were implemented as follows:
JdbcCatalogFactory.java:
A new ConfigOption named
database-optionswas added.This option is registered in
optionalOptions()to make it non-mandatory.AbstractJdbcCatalog.java:
The
getDatabaseUrl()method was modified to check for the presence ofdatabase-options.If the options exist, they are appended to the base JDBC URL using the correct separator (
?or;based on the database dialect).Factories and Loaders:
The database-options value is now passed through the factory chain: JdbcCatalogFactory -> JdbcFactoryLoader -> database-specific factories (MySqlFactory, PostgresFactory, etc.).
The method signatures in JdbcFactory.java and JdbcFactoryLoader.java were overloaded to accept the new dbOptions parameter.
Database-Specific Implementations:
All existing database-specific catalog implementations (MySQL, PostgreSQL, Derby, OceanBase, etc.) and their corresponding factories were updated to accept and pass the dbOptions parameter to the AbstractJdbcCatalog constructor.
Usage Example
With this change, a user can now create a JDBC catalog for MySQL and specify the required timezone property as follows:
Verification
The changes have been tested locally by creating a JDBC catalog for a MariaDB database that requires the serverTimezone property.
mysql
oceanbase
postgresql
Related Issues/PRs
JIRA: FLINK-38616
Stale PRs: [FLINK-33069]Mysql and Postgres catalog support url extra parameters #74, [FLINK-33800][JDBC/Connector] Allow passing parameters to database via jdbc url #83