Skip to content

Commit e943ec6

Browse files
author
och5351
committed
[hotfix][Connector/JDBC] Support PostgreSQL uuid type
1 parent 140f179 commit e943ec6

File tree

8 files changed

+100
-3
lines changed

8 files changed

+100
-3
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
173173
: TimestampData.fromTimestamp((Timestamp) val);
174174
case CHAR:
175175
case VARCHAR:
176-
return val -> StringData.fromString((String) val);
176+
return val -> StringData.fromString(val == null ? null : val.toString());
177177
case BINARY:
178178
case VARBINARY:
179179
return val -> val;

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper {
8484
private static final String PG_CHARACTER_ARRAY = "_character";
8585
private static final String PG_CHARACTER_VARYING = "varchar";
8686
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
87+
private static final String PG_UUID = "uuid";
8788

8889
@Override
8990
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
@@ -176,6 +177,8 @@ protected DataType getMapping(String pgType, int precision, int scale) {
176177
return DataTypes.DATE();
177178
case PG_DATE_ARRAY:
178179
return DataTypes.ARRAY(DataTypes.DATE());
180+
case PG_UUID:
181+
return DataTypes.VARCHAR(36);
179182
default:
180183
return null;
181184
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,24 @@ void testSerialTypes() {
193193
+ "9223372036854775807, "
194194
+ "9223372036854775807]]");
195195
}
196+
197+
@Test
198+
void testUuidTypes() {
199+
List<Row> results =
200+
CollectionUtil.iteratorToList(
201+
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE))
202+
.execute()
203+
.collect());
204+
assertThat(results).hasToString("[+I[1, 123e4567-e89b-12d3-a456-426614174000]]");
205+
}
206+
207+
@Test
208+
void testNullUuidTypes() {
209+
List<Row> results =
210+
CollectionUtil.iteratorToList(
211+
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE2))
212+
.execute()
213+
.collect());
214+
assertThat(results).hasToString("[+I[1, null]]");
215+
}
196216
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ void testListTables() throws DatabaseNotExistException {
7373
"public.serial_table",
7474
"public.t1",
7575
"public.t4",
76-
"public.t5"));
76+
"public.t5",
77+
"public.uuid_table"));
7778

7879
actual = catalog.listTables(TEST_DB);
7980

@@ -186,4 +187,11 @@ public void testSerialDataTypes() throws TableNotExistException {
186187

187188
assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema);
188189
}
190+
191+
@Test
192+
void testUuidDataTypes() throws TableNotExistException {
193+
CatalogBaseTable table =
194+
catalog.getTable(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
195+
assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema);
196+
}
189197
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ private static DatabaseMetadata getStaticMetadata() {
5555
protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
5656
protected static final String TABLE_ARRAY_TYPE = "array_table";
5757
protected static final String TABLE_SERIAL_TYPE = "serial_table";
58+
protected static final String TABLE_UUID_TYPE = "uuid_table";
59+
protected static final String TABLE_UUID_TYPE2 = "uuid_table2";
5860

5961
protected static String baseUrl;
6062
protected static PostgresCatalog catalog;
@@ -108,6 +110,12 @@ static void init() throws SQLException {
108110
createTable(
109111
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
110112
getSerialTable().pgSchemaSql);
113+
createTable(
114+
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE),
115+
getUuidTable().pgSchemaSql);
116+
createTable(
117+
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
118+
getNullUuidTable().pgSchemaSql);
111119

112120
executeSQL(
113121
PostgresCatalog.DEFAULT_DATABASE,
@@ -126,6 +134,14 @@ static void init() throws SQLException {
126134
PostgresCatalog.DEFAULT_DATABASE,
127135
String.format(
128136
"insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values));
137+
executeSQL(
138+
PostgresCatalog.DEFAULT_DATABASE,
139+
String.format(
140+
"insert into %s values (%s);", TABLE_UUID_TYPE, getUuidTable().values));
141+
executeSQL(
142+
PostgresCatalog.DEFAULT_DATABASE,
143+
String.format(
144+
"insert into %s values (%s);", TABLE_UUID_TYPE2, getNullUuidTable().values));
129145
}
130146

131147
@AfterAll
@@ -162,6 +178,14 @@ static void afterAll() throws SQLException {
162178
PostgresCatalog.DEFAULT_DATABASE,
163179
String.format(
164180
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE)));
181+
executeSQL(
182+
PostgresCatalog.DEFAULT_DATABASE,
183+
String.format(
184+
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE)));
185+
executeSQL(
186+
PostgresCatalog.DEFAULT_DATABASE,
187+
String.format(
188+
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2)));
165189
}
166190

167191
public static void createTable(PostgresTablePath tablePath, String tableSchemaSql)
@@ -393,4 +417,32 @@ public static TestTable getSerialTable() {
393417
+ "9223372036854775807,"
394418
+ "9223372036854775807");
395419
}
420+
421+
public static TestTable getUuidTable() {
422+
String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
423+
return new TestTable(
424+
Schema.newBuilder()
425+
.column("id", DataTypes.INT())
426+
.column("uid_col", DataTypes.VARCHAR(36))
427+
.build(),
428+
"id INT, "
429+
+ "uid_col UUID",
430+
String.format("1, '%s'", uuid1)
431+
432+
);
433+
}
434+
435+
public static TestTable getNullUuidTable(){
436+
437+
return new TestTable(
438+
Schema.newBuilder()
439+
.column("id", DataTypes.INT())
440+
.column("uid_col", DataTypes.VARCHAR(36))
441+
.build(),
442+
"id INT, "
443+
+ "uid_col UUID",
444+
"1, NULL"
445+
446+
);
447+
}
396448
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class PostgresDialectTest extends JdbcDialectTest implements PostgresTestBase {
3434
@Override
3535
protected List<TestItem> testData() {
3636
return Arrays.asList(
37+
createTestItem("STRING"),
38+
createTestItem("ARRAY<STRING>"),
3739
createTestItem("CHAR"),
3840
createTestItem("VARCHAR"),
3941
createTestItem("BOOLEAN"),

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ protected TableRow createInputTable() {
4444
return tableRow(
4545
"jdbDynamicTableSource",
4646
field("id", DataTypes.BIGINT().notNull()),
47+
// uuid test field
48+
field("uid_col", dbType("uuid"), DataTypes.STRING().notNull()),
4749
field("decimal_col", DataTypes.DECIMAL(10, 4)),
4850
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
4951
// other fields
@@ -53,16 +55,22 @@ protected TableRow createInputTable() {
5355
}
5456

5557
protected List<Row> getTestData() {
58+
59+
String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
60+
String uuid2 = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11";
61+
5662
return Arrays.asList(
5763
Row.of(
5864
1L,
65+
uuid1,
5966
BigDecimal.valueOf(100.1234),
6067
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
6168
1.175E-37F,
6269
1.79769E308D,
6370
LocalTime.parse("15:35")),
6471
Row.of(
6572
2L,
73+
uuid2,
6674
BigDecimal.valueOf(101.1234),
6775
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
6876
-1.175E-37F,

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ public PostgresMetadata(PostgreSQLContainer<?> container) {
4242
public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean hasXaEnabled) {
4343
this.username = container.getUsername();
4444
this.password = container.getPassword();
45-
this.url = container.getJdbcUrl();
45+
String baseUrl = container.getJdbcUrl();
46+
this.url =
47+
baseUrl.contains("?")
48+
? baseUrl + "&stringtype=unspecified"
49+
: baseUrl + "?stringtype=unspecified";
4650
this.driver = container.getDriverClassName();
4751
this.version = container.getDockerImageName();
4852
this.xaEnabled = hasXaEnabled;

0 commit comments

Comments
 (0)