Skip to content

Commit 2e21270

Browse files
author
och5351
committed
🔥 : improve : support jdbc db options
- Apply review feedback - Add null data test case - Remove unused import
1 parent c088e46 commit 2e21270

File tree

5 files changed

+50
-6
lines changed

5 files changed

+50
-6
lines changed

‎flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialectConverter.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
173173
: TimestampData.fromTimestamp((Timestamp) val);
174174
case CHAR:
175175
case VARCHAR:
176-
return val -> StringData.fromString(val.toString());
176+
return val -> StringData.fromString(val == null ? null: val.toString());
177177
case BINARY:
178178
case VARBINARY:
179179
return val -> val;

‎flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
2929

3030
import java.lang.reflect.Array;
31-
import java.util.UUID;
3231
/**
3332
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
3433
* PostgreSQL.

‎flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogITCase.java‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,16 @@ void testUuidTypes() {
205205
.hasToString(
206206
"[+I[1, 123e4567-e89b-12d3-a456-426614174000]]");
207207
}
208+
209+
@Test
210+
void testNullUuidTypes() {
211+
List<Row> results =
212+
CollectionUtil.iteratorToList(
213+
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE2))
214+
.execute()
215+
.collect());
216+
assertThat(results)
217+
.hasToString(
218+
"[+I[1, NULL]]");
219+
}
208220
}

‎flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ void testListTables() throws DatabaseNotExistException {
7474
"public.t1",
7575
"public.t4",
7676
"public.t5",
77-
"public.uuid_table"));
77+
"public.uuid_table",
78+
"public.uuid_table2"));
7879

7980
actual = catalog.listTables(TEST_DB);
8081

@@ -195,4 +196,12 @@ void testUuidDataTypes() throws TableNotExistException {
195196
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
196197
assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema);
197198
}
199+
200+
@Test
201+
void testNullUuidDataTypes() throws TableNotExistException {
202+
CatalogBaseTable table =
203+
catalog.getTable(
204+
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE2));
205+
assertThat(table.getUnresolvedSchema()).isEqualTo(getNullUuidTable().schema);
206+
}
198207
}

‎flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java‎

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ private static DatabaseMetadata getStaticMetadata() {
5656
protected static final String TABLE_ARRAY_TYPE = "array_table";
5757
protected static final String TABLE_SERIAL_TYPE = "serial_table";
5858
protected static final String TABLE_UUID_TYPE = "uuid_table";
59+
protected static final String TABLE_UUID_TYPE2 = "uuid_table2";
5960

6061
protected static String baseUrl;
6162
protected static PostgresCatalog catalog;
@@ -112,6 +113,9 @@ static void init() throws SQLException {
112113
createTable(
113114
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE),
114115
getUuidTable().pgSchemaSql);
116+
createTable(
117+
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
118+
getNullUuidTable().pgSchemaSql);
115119

116120
executeSQL(
117121
PostgresCatalog.DEFAULT_DATABASE,
@@ -134,6 +138,10 @@ static void init() throws SQLException {
134138
PostgresCatalog.DEFAULT_DATABASE,
135139
String.format(
136140
"insert into %s values (%s);", TABLE_UUID_TYPE, getUuidTable().values));
141+
executeSQL(
142+
PostgresCatalog.DEFAULT_DATABASE,
143+
String.format(
144+
"insert into %s values (%s);", TABLE_UUID_TYPE2, getNullUuidTable().values));
137145
}
138146

139147
@AfterAll
@@ -174,6 +182,10 @@ static void afterAll() throws SQLException {
174182
PostgresCatalog.DEFAULT_DATABASE,
175183
String.format(
176184
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE)));
185+
executeSQL(
186+
PostgresCatalog.DEFAULT_DATABASE,
187+
String.format(
188+
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2)));
177189
}
178190

179191
public static void createTable(PostgresTablePath tablePath, String tableSchemaSql)
@@ -408,6 +420,19 @@ public static TestTable getSerialTable() {
408420

409421
public static TestTable getUuidTable() {
410422
String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
423+
return new TestTable(
424+
Schema.newBuilder()
425+
.column("id", DataTypes.INT())
426+
.column("uid_col", DataTypes.VARCHAR(36))
427+
.build(),
428+
"id INT, "
429+
+ "uid_col UUID",
430+
String.format("1, '%s'", uuid1)
431+
432+
);
433+
}
434+
435+
public static TestTable getNullUuidTable(){
411436

412437
return new TestTable(
413438
Schema.newBuilder()
@@ -416,9 +441,8 @@ public static TestTable getUuidTable() {
416441
.build(),
417442
"id INT, "
418443
+ "uid_col UUID",
419-
String.format(
420-
"1, '%s'",
421-
uuid1)
444+
"1, NULL"
445+
422446
);
423447
}
424448
}

0 commit comments

Comments
 (0)