Skip to content

Commit e198a25

Browse files
authored
[FLINK-34572] Support OceanBase Jdbc Catalog (#109)
1 parent 2ec0b81 commit e198a25

File tree

27 files changed

+1412
-78
lines changed

27 files changed

+1412
-78
lines changed

docs/content.zh/docs/connectors/table/jdbc.md

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,9 @@ JDBC Catalog
432432

433433
`JdbcCatalog` 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。
434434

435-
目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。
435+
目前,JDBC Catalog 有以下实现:Postgres Catalog、MySQL Catalog、CrateDB Catalog 和 OceanBase Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。
436436

437437
```java
438-
// Postgres Catalog & MySQL Catalog 支持的方法
439438
databaseExists(String databaseName);
440439
listDatabases();
441440
getDatabase(String databaseName);
@@ -450,17 +449,19 @@ tableExists(ObjectPath tablePath);
450449

451450
### JDBC Catalog 的使用
452451

453-
本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。
452+
本小节主要描述如何创建并使用 JDBC Catalog。
454453
请参阅 [依赖](#依赖) 部分了解如何配置 JDBC 连接器和相应的驱动。
455454

456455
JDBC catalog 支持以下参数:
457456
- `name`:必填,catalog 的名称。
458457
- `default-database`:必填,默认要连接的数据库。
459-
- `username`:必填,Postgres/MySQL 账户的用户名
458+
- `username`:必填,数据库账户的用户名
460459
- `password`:必填,账户的密码。
461460
- `base-url`:必填,(不应该包含数据库名)
462461
- 对于 Postgres Catalog `base-url` 应为 `"jdbc:postgresql://<ip>:<port>"` 的格式。
463462
- 对于 MySQL Catalog `base-url` 应为 `"jdbc:mysql://<ip>:<port>"` 的格式。
463+
- 对于 OceanBase Catalog `base-url` 应为 `"jdbc:oceanbase://<ip>:<port>"` 的格式。
464+
- `compatible-mode`: 选填,数据库的兼容模式。
464465

465466
{{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}}
466467
{{< tab "SQL" >}}
@@ -656,6 +657,42 @@ SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
656657
SELECT * FROM crate.`custom_schema.test_table2`;
657658
SELECT * FROM `custom_schema.test_table2`;
658659
```
660+
661+
<a name="jdbc-catalog-for-oceanbase"></a>
662+
663+
### JDBC Catalog for OceanBase
664+
665+
<a name="oceanbase-metaspace-mapping"></a>
666+
667+
#### OceanBase 元空间映射
668+
669+
OceanBase 数据库支持多租户管理,每个租户可以工作在 MySQL 兼容模式或 Oracle 兼容模式。在 OceanBase 的 MySQL 模式上,一个租户中有数据库和表,就像 MySQL 数据库中的数据库和表一样,但没有 schema。在 OceanBase 的 Oracle 模式下,一个租户中有 schema 和表,就像 Oracle 数据库中的 schema 和表一样,但没有数据库。
670+
671+
在 Flink 中,查询 OceanBase Catalog 注册的表时,OceanBase MySQL 模式下可以使用 `database.table_name` 或只使用 `table_name`,OceanBase Oracle 模式下可以使用 `schema.table_name` 或只使用 `table_name`
672+
673+
因此,Flink Catalog 和 OceanBase catalog 之间的元空间映射如下:
674+
675+
| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL Mode) | OceanBase Metaspace Structure (Oracle Mode) |
676+
|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
677+
| catalog name (defined in Flink only) | N/A | N/A |
678+
| database name | database name | schema name |
679+
| table name | table name | table name |
680+
681+
Flink 中的 OceanBase 表的完整路径应该是 ``"`<catalog>`.`<db or schema>`.`<table>`"``
682+
683+
这里提供了一些访问 OceanBase 表的例子:
684+
685+
```sql
686+
-- 扫描 默认数据库 'mydb' 中的 'test_table' 表
687+
SELECT * FROM oceanbase_catalog.mydb.test_table;
688+
SELECT * FROM mydb.test_table;
689+
SELECT * FROM test_table;
690+
691+
-- 扫描 'given_database' 数据库中的 'test_table2' 表,
692+
SELECT * FROM oceanbase_catalog.given_database.test_table2;
693+
SELECT * FROM given_database.test_table2;
694+
```
695+
659696
<a name="data-type-mapping"></a>
660697

661698
数据类型映射

docs/content/docs/connectors/table/jdbc.md

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -441,10 +441,9 @@ JDBC Catalog
441441

442442
The `JdbcCatalog` enables users to connect Flink to relational databases over JDBC protocol.
443443

444-
Currently, there are two JDBC catalog implementations, Postgres Catalog and MySQL Catalog. They support the following catalog methods. Other methods are currently not supported.
444+
Currently, there are following JDBC catalog implementations: Postgres Catalog, MySQL Catalog, CrateDB Catalog and OceanBase Catalog. They support the following catalog methods. Other methods are currently not supported.
445445

446446
```java
447-
// The supported methods by Postgres & MySQL Catalog.
448447
databaseExists(String databaseName);
449448
listDatabases();
450449
getDatabase(String databaseName);
@@ -457,17 +456,19 @@ Other `Catalog` methods are currently not supported.
457456

458457
### Usage of JDBC Catalog
459458

460-
The section mainly describes how to create and use a Postgres Catalog or MySQL Catalog.
459+
The section mainly describes how to create and use a JDBC Catalog.
461460
Please refer to [Dependencies](#dependencies) section for how to setup a JDBC connector and the corresponding driver.
462461

463462
The JDBC catalog supports the following options:
464463
- `name`: required, name of the catalog.
465464
- `default-database`: required, default database to connect to.
466-
- `username`: required, username of Postgres/MySQL account.
465+
- `username`: required, username of database account.
467466
- `password`: required, password of the account.
468467
- `base-url`: required (should not contain the database name)
469468
- for Postgres Catalog this should be `"jdbc:postgresql://<ip>:<port>"`
470469
- for MySQL Catalog this should be `"jdbc:mysql://<ip>:<port>"`
470+
- for OceanBase Catalog this should be `jdbc:oceanbase://<ip>:<port>`
471+
- `compatible-mode`: optional, the compatible mode of database.
471472

472473
{{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}}
473474
{{< tab "SQL" >}}
@@ -654,6 +655,37 @@ SELECT * FROM crate.`custom_schema.test_table2`;
654655
SELECT * FROM `custom_schema.test_table2`;
655656
```
656657

658+
### JDBC Catalog for OceanBase
659+
660+
#### OceanBase Metaspace Mapping
661+
662+
OceanBase database supports multiple tenant management, and each tenant can work at MySQL compatible mode or Oracle compatible mode. On MySQL mode of OceanBase, there are databases and tables but no schema in one tenant, these objects just like databases and tables in the MySQL database. On Oracle mode of OceanBase, there are schemas and tables but no database in one tenant, these objects just like schemas and tables in the Oracle database.
663+
664+
In Flink, when querying tables registered by OceanBase Catalog, users can use either `database.table_name` or just `table_name` on OceanBase MySQL mode, or use either `schema.table_name` or just `table_name` on OceanBase Oracle mode.
665+
666+
Therefore, the metaspace mapping between Flink Catalog and OceanBase is as following:
667+
668+
| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL Mode) | OceanBase Metaspace Structure (Oracle Mode) |
669+
|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
670+
| catalog name (defined in Flink only) | N/A | N/A |
671+
| database name | database name | schema name |
672+
| table name | table name | table name |
673+
674+
The full path of OceanBase table in Flink should be "`<catalog>`.`<db or schema>`.`<table>`".
675+
676+
Here are some examples to access OceanBase tables:
677+
678+
```sql
679+
-- scan table 'test_table', the default database or schema is 'mydb'.
680+
SELECT * FROM oceanbase_catalog.mydb.test_table;
681+
SELECT * FROM mydb.test_table;
682+
SELECT * FROM test_table;
683+
684+
-- scan table 'test_table' with the given database or schema.
685+
SELECT * FROM oceanbase_catalog.given_database.test_table2;
686+
SELECT * FROM given_database.test_table2;
687+
```
688+
657689
Data Type Mapping
658690
----------------
659691
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.connector.jdbc.core.database.catalog;
2020

21+
import org.apache.flink.annotation.PublicEvolving;
2122
import org.apache.flink.connector.jdbc.core.table.JdbcDynamicTableFactory;
2223
import org.apache.flink.table.api.Schema;
2324
import org.apache.flink.table.api.ValidationException;
@@ -87,6 +88,7 @@
8788
import static org.apache.flink.util.Preconditions.checkNotNull;
8889

8990
/** Abstract catalog for any JDBC catalogs. */
91+
@PublicEvolving
9092
public abstract class AbstractJdbcCatalog extends AbstractCatalog implements JdbcCatalog {
9193

9294
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
@@ -127,7 +129,7 @@ public AbstractJdbcCatalog(
127129

128130
this.userClassLoader = userClassLoader;
129131
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
130-
this.defaultUrl = this.baseUrl + defaultDatabase;
132+
this.defaultUrl = getDatabaseUrl(defaultDatabase);
131133
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
132134
checkArgument(
133135
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
@@ -136,6 +138,10 @@ public AbstractJdbcCatalog(
136138
connectionProperties.getProperty(PASSWORD_KEY)));
137139
}
138140

141+
protected String getDatabaseUrl(String databaseName) {
142+
return baseUrl + databaseName;
143+
}
144+
139145
@Override
140146
public void open() throws CatalogException {
141147
// load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail
@@ -266,9 +272,9 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
266272
}
267273

268274
String databaseName = tablePath.getDatabaseName();
269-
String dbUrl = baseUrl + databaseName;
270275

271-
try (Connection conn = DriverManager.getConnection(dbUrl, connectionProperties)) {
276+
try (Connection conn =
277+
DriverManager.getConnection(getDatabaseUrl(databaseName), connectionProperties)) {
272278
DatabaseMetaData metaData = conn.getMetaData();
273279
Optional<UniqueConstraint> primaryKey =
274280
getPrimaryKey(
@@ -299,19 +305,23 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
299305
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
300306
Schema tableSchema = schemaBuilder.build();
301307

302-
Map<String, String> props = new HashMap<>();
303-
props.put(CONNECTOR.key(), IDENTIFIER);
304-
props.put(URL.key(), dbUrl);
305-
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
306-
props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY));
307-
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
308-
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
308+
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath));
309309
} catch (Exception e) {
310310
throw new CatalogException(
311311
String.format("Failed getting table %s", tablePath.getFullName()), e);
312312
}
313313
}
314314

315+
protected Map<String, String> getOptions(ObjectPath tablePath) {
316+
Map<String, String> props = new HashMap<>();
317+
props.put(CONNECTOR.key(), IDENTIFIER);
318+
props.put(URL.key(), getDatabaseUrl(tablePath.getDatabaseName()));
319+
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
320+
props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY));
321+
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
322+
return props;
323+
}
324+
315325
@Override
316326
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
317327
throws TableNotExistException, CatalogException {

flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBCatalog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
134134
}
135135

136136
String searchPath =
137-
extractColumnValuesBySQL(baseUrl + DEFAULT_DATABASE, "show search_path", 1, null)
137+
extractColumnValuesBySQL(
138+
getDatabaseUrl(DEFAULT_DATABASE), "show search_path", 1, null)
138139
.get(0);
139140
String[] schemas = searchPath.split("\\s*,\\s*");
140141

flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public List<String> listTables(String databaseName)
118118
}
119119

120120
return extractColumnValuesBySQL(
121-
baseUrl + databaseName,
121+
getDatabaseUrl(databaseName),
122122
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
123123
1,
124124
null,

flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/OceanBaseFactory.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
2323
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
2424
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
25+
import org.apache.flink.connector.jdbc.oceanbase.database.catalog.OceanBaseCatalog;
26+
import org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseCompatibleMode;
2527
import org.apache.flink.connector.jdbc.oceanbase.database.dialect.OceanBaseDialect;
2628

27-
import javax.annotation.Nonnull;
28-
2929
/** Factory for {@link OceanBaseDialect}. */
3030
@Internal
3131
public class OceanBaseFactory implements JdbcFactory {
@@ -37,13 +37,12 @@ public boolean acceptsURL(String url) {
3737

3838
@Override
3939
public JdbcDialect createDialect() {
40-
throw new UnsupportedOperationException(
41-
"Can't create JdbcDialect without compatible mode for OceanBase");
40+
return createDialect(null);
4241
}
4342

4443
@Override
45-
public JdbcDialect createDialect(@Nonnull String compatibleMode) {
46-
return new OceanBaseDialect(compatibleMode);
44+
public JdbcDialect createDialect(String compatibleMode) {
45+
return new OceanBaseDialect(OceanBaseCompatibleMode.parse(compatibleMode));
4746
}
4847

4948
@Override
@@ -54,6 +53,26 @@ public JdbcCatalog createCatalog(
5453
String username,
5554
String pwd,
5655
String baseUrl) {
57-
throw new UnsupportedOperationException("Catalog for OceanBase is not supported yet.");
56+
return createCatalog(
57+
classLoader, catalogName, defaultDatabase, username, pwd, baseUrl, null);
58+
}
59+
60+
@Override
61+
public JdbcCatalog createCatalog(
62+
ClassLoader classLoader,
63+
String catalogName,
64+
String defaultDatabase,
65+
String username,
66+
String pwd,
67+
String baseUrl,
68+
String compatibleMode) {
69+
return new OceanBaseCatalog(
70+
classLoader,
71+
catalogName,
72+
OceanBaseCompatibleMode.parse(compatibleMode),
73+
defaultDatabase,
74+
username,
75+
pwd,
76+
baseUrl);
5877
}
5978
}

0 commit comments

Comments
 (0)