Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
: TimestampData.fromTimestamp((Timestamp) val);
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
return val -> StringData.fromString(val == null ? null : val.toString());
case BINARY:
case VARBINARY:
return val -> val;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper {
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private static final String PG_UUID = "uuid";

@Override
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
Expand Down Expand Up @@ -176,6 +177,8 @@ protected DataType getMapping(String pgType, int precision, int scale) {
return DataTypes.DATE();
case PG_DATE_ARRAY:
return DataTypes.ARRAY(DataTypes.DATE());
case PG_UUID:
return DataTypes.VARCHAR(36);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,24 @@ void testSerialTypes() {
+ "9223372036854775807, "
+ "9223372036854775807]]");
}

@Test
void testUuidTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE))
.execute()
.collect());
assertThat(results).hasToString("[+I[1, 123e4567-e89b-12d3-a456-426614174000]]");
}

@Test
void testNullUuidTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_UUID_TYPE2))
.execute()
.collect());
assertThat(results).hasToString("[+I[1, null]]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ void testListTables() throws DatabaseNotExistException {
"public.serial_table",
"public.t1",
"public.t4",
"public.t5"));
"public.t5",
"public.uuid_table",
"public.uuid_table2"));

actual = catalog.listTables(TEST_DB);

Expand Down Expand Up @@ -186,4 +188,19 @@ public void testSerialDataTypes() throws TableNotExistException {

assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema);
}

@Test
void testUuidDataTypes() throws TableNotExistException {
CatalogBaseTable table =
catalog.getTable(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE));
assertThat(table.getUnresolvedSchema()).isEqualTo(getUuidTable().schema);
}

@Test
void testNullUuidDataTypes() throws TableNotExistException {
CatalogBaseTable table =
catalog.getTable(
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_UUID_TYPE2));
assertThat(table.getUnresolvedSchema()).isEqualTo(getNullUuidTable().schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ private static DatabaseMetadata getStaticMetadata() {
protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
protected static final String TABLE_ARRAY_TYPE = "array_table";
protected static final String TABLE_SERIAL_TYPE = "serial_table";
protected static final String TABLE_UUID_TYPE = "uuid_table";
protected static final String TABLE_UUID_TYPE2 = "uuid_table2";

protected static String baseUrl;
protected static PostgresCatalog catalog;
Expand Down Expand Up @@ -108,6 +110,11 @@ static void init() throws SQLException {
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
getSerialTable().pgSchemaSql);
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE), getUuidTable().pgSchemaSql);
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2),
getNullUuidTable().pgSchemaSql);

executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
Expand All @@ -126,6 +133,15 @@ static void init() throws SQLException {
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);", TABLE_UUID_TYPE, getUuidTable().values));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);",
TABLE_UUID_TYPE2, getNullUuidTable().values));
}

@AfterAll
Expand Down Expand Up @@ -162,6 +178,14 @@ static void afterAll() throws SQLException {
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_UUID_TYPE2)));
}

public static void createTable(PostgresTablePath tablePath, String tableSchemaSql)
Expand Down Expand Up @@ -393,4 +417,26 @@ public static TestTable getSerialTable() {
+ "9223372036854775807,"
+ "9223372036854775807");
}

public static TestTable getUuidTable() {
String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
return new TestTable(
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("uid_col", DataTypes.VARCHAR(36))
.build(),
"id INT, " + "uid_col UUID",
String.format("1, '%s'", uuid1));
}

public static TestTable getNullUuidTable() {

return new TestTable(
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("uid_col", DataTypes.VARCHAR(36))
.build(),
"id INT, " + "uid_col UUID",
"1, NULL");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class PostgresDialectTest extends JdbcDialectTest implements PostgresTestBase {
@Override
protected List<TestItem> testData() {
return Arrays.asList(
createTestItem("STRING"),
createTestItem("ARRAY<STRING>"),
createTestItem("CHAR"),
createTestItem("VARCHAR"),
createTestItem("BOOLEAN"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
field("id", DataTypes.BIGINT().notNull()),
// uuid test field
field("uid_col", dbType("uuid"), DataTypes.STRING().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
// other fields
Expand All @@ -53,16 +55,22 @@ protected TableRow createInputTable() {
}

protected List<Row> getTestData() {

String uuid1 = "123e4567-e89b-12d3-a456-426614174000";
String uuid2 = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11";

return Arrays.asList(
Row.of(
1L,
uuid1,
BigDecimal.valueOf(100.1234),
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
1.175E-37F,
1.79769E308D,
LocalTime.parse("15:35")),
Row.of(
2L,
uuid2,
BigDecimal.valueOf(101.1234),
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
-1.175E-37F,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public PostgresMetadata(PostgreSQLContainer<?> container) {
public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean hasXaEnabled) {
this.username = container.getUsername();
this.password = container.getPassword();
this.url = container.getJdbcUrl();
String baseUrl = container.getJdbcUrl();
this.url =
baseUrl.contains("?")
? baseUrl + "&stringtype=unspecified"
: baseUrl + "?stringtype=unspecified";
this.driver = container.getDriverClassName();
this.version = container.getDockerImageName();
this.xaEnabled = hasXaEnabled;
Expand Down
Loading