Skip to content

Commit 4b8c7fc

Browse files
author
och5351
committed
[hotfix][Connector/JDBC] Support PostgreSQL uuid type
1 parent 2e21270 commit 4b8c7fc

File tree

7 files changed

+17
-25
lines changed

7 files changed

+17
-25
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
173173
: TimestampData.fromTimestamp((Timestamp) val);
174174
case CHAR:
175175
case VARCHAR:
176-
return val -> StringData.fromString(val == null ? null: val.toString());
176+
return val -> StringData.fromString(val == null ? null : val.toString());
177177
case BINARY:
178178
case VARBINARY:
179179
return val -> val;

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
2929

3030
import java.lang.reflect.Array;
31+
3132
/**
3233
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
3334
* PostgreSQL.

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,7 @@ void testUuidTypes() {
201201
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE))
202202
.execute()
203203
.collect());
204-
assertThat(results)
205-
.hasToString(
206-
"[+I[1, 123e4567-e89b-12d3-a456-426614174000]]");
204+
assertThat(results).hasToString("[+I[1, 123e4567-e89b-12d3-a456-426614174000]]");
207205
}
208206

209207
@Test
@@ -213,8 +211,6 @@ void testNullUuidTypes() {
213211
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE2))
214212
.execute()
215213
.collect());
216-
assertThat(results)
217-
.hasToString(
218-
"[+I[1, NULL]]");
214+
assertThat(results).hasToString("[+I[1, NULL]]");
219215
}
220216
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,7 @@ public void testSerialDataTypes() throws TableNotExistException {
192192
@Test
193193
void testUuidDataTypes() throws TableNotExistException {
194194
CatalogBaseTable table =
195-
catalog.getTable(
196-
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
195+
catalog.getTable(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
197196
assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema);
198197
}
199198

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ static void init() throws SQLException {
111111
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
112112
getSerialTable().pgSchemaSql);
113113
createTable(
114-
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE),
115-
getUuidTable().pgSchemaSql);
114+
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE), getUuidTable().pgSchemaSql);
116115
createTable(
117116
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
118117
getNullUuidTable().pgSchemaSql);
@@ -141,7 +140,8 @@ static void init() throws SQLException {
141140
executeSQL(
142141
PostgresCatalog.DEFAULT_DATABASE,
143142
String.format(
144-
"insert into %s values (%s);", TABLE_UUID_TYPE2, getNullUuidTable().values));
143+
"insert into %s values (%s);",
144+
TABLE_UUID_TYPE2, getNullUuidTable().values));
145145
}
146146

147147
@AfterAll
@@ -425,24 +425,18 @@ public static TestTable getUuidTable() {
425425
.column("id", DataTypes.INT())
426426
.column("uid_col", DataTypes.VARCHAR(36))
427427
.build(),
428-
"id INT, "
429-
+ "uid_col UUID",
430-
String.format("1, '%s'", uuid1)
431-
432-
);
428+
"id INT, " + "uid_col UUID",
429+
String.format("1, '%s'", uuid1));
433430
}
434431

435-
public static TestTable getNullUuidTable(){
432+
public static TestTable getNullUuidTable() {
436433

437434
return new TestTable(
438435
Schema.newBuilder()
439436
.column("id", DataTypes.INT())
440437
.column("uid_col", DataTypes.VARCHAR(36))
441438
.build(),
442-
"id INT, "
443-
+ "uid_col UUID",
444-
"1, NULL"
445-
446-
);
439+
"id INT, " + "uid_col UUID",
440+
"1, NULL");
447441
}
448442
}

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ protected TableRow createInputTable() {
4646
field("id", DataTypes.BIGINT().notNull()),
4747
// uuid test field
4848
field("uid_col", dbType("uuid"), DataTypes.STRING().notNull()),
49-
5049
field("decimal_col", DataTypes.DECIMAL(10, 4)),
5150
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
5251
// other fields

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean hasXaEnabled
4343
this.username = container.getUsername();
4444
this.password = container.getPassword();
4545
String baseUrl = container.getJdbcUrl();
46-
this.url = baseUrl.contains("?") ? baseUrl + "&stringtype=unspecified" : baseUrl + "?stringtype=unspecified";
46+
this.url =
47+
baseUrl.contains("?")
48+
? baseUrl + "&stringtype=unspecified"
49+
: baseUrl + "?stringtype=unspecified";
4750
this.driver = container.getDriverClassName();
4851
this.version = container.getDockerImageName();
4952
this.xaEnabled = hasXaEnabled;

0 commit comments

Comments
 (0)