diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java index e27dd8526..0ba4dfd01 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java @@ -173,7 +173,7 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) : TimestampData.fromTimestamp((Timestamp) val); case CHAR: case VARCHAR: - return val -> StringData.fromString((String) val); + return val -> StringData.fromString(val == null ? null : val.toString()); case BINARY: case VARBINARY: return val -> val; diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java index 5fc19ea36..04a21255a 100644 --- a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java @@ -84,6 +84,7 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper { private static final String PG_CHARACTER_ARRAY = "_character"; private static final String PG_CHARACTER_VARYING = "varchar"; private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + private static final String PG_UUID = "uuid"; @Override public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) @@ -176,6 +177,8 @@ protected DataType getMapping(String pgType, int precision, int scale) { return DataTypes.DATE(); case PG_DATE_ARRAY: return DataTypes.ARRAY(DataTypes.DATE()); + case PG_UUID: + return DataTypes.VARCHAR(36); default: return null; } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java index ff7eabd77..c829f19df 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java @@ -193,4 +193,24 @@ void testSerialTypes() { + "9223372036854775807, " + "9223372036854775807]]"); } + + @Test + void testUuidTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1, 123e4567-e89b-12d3-a456-426614174000]]"); + } + + @Test + void testNullUuidTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE2)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1, null]]"); + } } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java index 56da37c31..9ae3b9c4b 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java @@ -73,7 +73,9 @@ void testListTables() throws DatabaseNotExistException { "public.serial_table", "public.t1", "public.t4", - "public.t5")); + "public.t5", + "public.uuid_table", + "public.uuid_table2")); actual = catalog.listTables(TEST_DB); @@ -186,4 +188,19 @@ public void testSerialDataTypes() throws TableNotExistException { assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema); } + + @Test + void testUuidDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE)); + assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema); + } + + @Test + void testNullUuidDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE2)); + assertThat(table.getUnresolvedSchema()).isEqualTo(getNullUuidTable().schema); + } } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java index f3d0d1c2b..6477d9a13 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java @@ -55,6 +55,8 @@ private static DatabaseMetadata getStaticMetadata() { protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2"; protected static final String TABLE_ARRAY_TYPE = "array_table"; protected static final String TABLE_SERIAL_TYPE = "serial_table"; + protected static final String TABLE_UUID_TYPE = "uuid_table"; + protected static final String TABLE_UUID_TYPE2 = "uuid_table2"; protected static String baseUrl; protected static PostgresCatalog catalog; @@ -108,6 +110,11 @@ static void init() throws SQLException { createTable( PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), getSerialTable().pgSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE), getUuidTable().pgSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2), + getNullUuidTable().pgSchemaSql); executeSQL( PostgresCatalog.DEFAULT_DATABASE, @@ -126,6 +133,15 @@ static void init() throws SQLException { PostgresCatalog.DEFAULT_DATABASE, String.format( "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); + executeSQL( + PostgresCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_UUID_TYPE, getUuidTable().values)); + executeSQL( + PostgresCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", + TABLE_UUID_TYPE2, getNullUuidTable().values)); } @AfterAll @@ -162,6 +178,14 @@ static void afterAll() throws SQLException { PostgresCatalog.DEFAULT_DATABASE, String.format( "DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE))); + executeSQL( + PostgresCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE))); + executeSQL( + PostgresCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2))); } public static void createTable(PostgresTablePath tablePath, String tableSchemaSql) @@ -393,4 +417,26 @@ public static TestTable getSerialTable() { + "9223372036854775807," + "9223372036854775807"); } + + public static TestTable getUuidTable() { + String uuid1 = "123e4567-e89b-12d3-a456-426614174000"; + return new TestTable( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("uid_col", DataTypes.VARCHAR(36)) + .build(), + "id INT, " + "uid_col UUID", + String.format("1, '%s'", uuid1)); + } + + public static TestTable getNullUuidTable() { + + return new TestTable( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("uid_col", DataTypes.VARCHAR(36)) + .build(), + "id INT, " + "uid_col UUID", + "1, NULL"); + } } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java index 0ad1fb55f..dc9290de9 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java @@ -34,6 +34,8 @@ class PostgresDialectTest extends JdbcDialectTest implements PostgresTestBase { @Override protected List testData() { return Arrays.asList( + createTestItem("STRING"), + createTestItem("ARRAY"), createTestItem("CHAR"), createTestItem("VARCHAR"), createTestItem("BOOLEAN"), diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java index 079260d94..fb208d087 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java @@ -44,6 +44,8 @@ protected TableRow createInputTable() { return tableRow( "jdbDynamicTableSource", field("id", DataTypes.BIGINT().notNull()), + // uuid test field + field("uid_col", dbType("uuid"), DataTypes.STRING().notNull()), field("decimal_col", DataTypes.DECIMAL(10, 4)), field("timestamp6_col", DataTypes.TIMESTAMP(6)), // other fields @@ -53,9 +55,14 @@ protected TableRow createInputTable() { } protected List getTestData() { + + String uuid1 = "123e4567-e89b-12d3-a456-426614174000"; + String uuid2 = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"; + return Arrays.asList( Row.of( 1L, + uuid1, BigDecimal.valueOf(100.1234), LocalDateTime.parse("2020-01-01T15:35:00.123456"), 1.175E-37F, @@ -63,6 +70,7 @@ protected List getTestData() { LocalTime.parse("15:35")), Row.of( 2L, + uuid2, BigDecimal.valueOf(101.1234), LocalDateTime.parse("2020-01-01T15:36:01.123456"), -1.175E-37F, diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java index 2d083e051..220df3b59 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java @@ -42,7 +42,11 @@ public PostgresMetadata(PostgreSQLContainer container) { public PostgresMetadata(JdbcDatabaseContainer container, boolean hasXaEnabled) { this.username = container.getUsername(); this.password = container.getPassword(); - this.url = container.getJdbcUrl(); + String baseUrl = container.getJdbcUrl(); + this.url = + baseUrl.contains("?") + ? baseUrl + "&stringtype=unspecified" + : baseUrl + "?stringtype=unspecified"; this.driver = container.getDriverClassName(); this.version = container.getDockerImageName(); this.xaEnabled = hasXaEnabled;