Skip to content

Commit 2260c11

Browse files
authored
[FLINK-38802][table] DDL with duplicated keys in table options should not fail
1 parent 5954b5b commit 2260c11

File tree

3 files changed

+27
-18
lines changed

3 files changed

+27
-18
lines changed

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import org.apache.calcite.sql.SqlNode;
2525
import org.apache.calcite.sql.SqlNodeList;
2626
import org.apache.calcite.util.NlsString;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import javax.annotation.Nullable;
2931

32+
import java.util.HashMap;
3033
import java.util.LinkedHashMap;
3134
import java.util.List;
3235
import java.util.Map;
@@ -36,6 +39,7 @@
3639

3740
/** Utils methods for parsing DDLs. */
3841
public class SqlParseUtils {
42+
private static final Logger LOG = LoggerFactory.getLogger(SqlParseUtils.class);
3943

4044
private SqlParseUtils() {}
4145

@@ -85,9 +89,15 @@ public static Map<String, String> extractMap(@Nullable SqlNodeList propList) {
8589
if (propList == null) {
8690
return Map.of();
8791
}
88-
return propList.getList().stream()
89-
.map(p -> (SqlTableOption) p)
90-
.collect(Collectors.toMap(k -> k.getKeyString(), SqlTableOption::getValueString));
92+
final Map<String, String> result = new HashMap<>();
93+
for (SqlNode node : propList) {
94+
final SqlTableOption tableOption = (SqlTableOption) node;
95+
final String key = tableOption.getKeyString();
96+
if (result.put(key, tableOption.getValueString()) != null) {
97+
LOG.warn("There are duplicated values for the same key {}", key);
98+
}
99+
}
100+
return result;
91101
}
92102

93103
public static List<String> extractList(

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.flink.sql.parser.ddl.catalog;
2020

21+
import org.apache.flink.sql.parser.SqlParseUtils;
2122
import org.apache.flink.sql.parser.SqlUnparseUtils;
22-
import org.apache.flink.sql.parser.ddl.SqlTableOption;
2323

2424
import org.apache.calcite.sql.SqlIdentifier;
2525
import org.apache.calcite.sql.SqlNode;
@@ -30,7 +30,6 @@
3030

3131
import java.util.List;
3232
import java.util.Map;
33-
import java.util.stream.Collectors;
3433

3534
import static java.util.Objects.requireNonNull;
3635

@@ -55,13 +54,7 @@ public SqlNodeList getPropertyList() {
5554
}
5655

5756
public Map<String, String> getProperties() {
58-
return propertyList.stream()
59-
.map(p -> (SqlTableOption) p)
60-
.collect(
61-
Collectors.toMap(
62-
SqlTableOption::getKeyString,
63-
SqlTableOption::getValueString,
64-
(option1, option2) -> option2));
57+
return SqlParseUtils.extractMap(propertyList);
6558
}
6659

6760
@Override

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,28 +1117,33 @@ void testAlterTable() throws Exception {
11171117
// test alter table options
11181118
checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1', 'K2' = 'V2')");
11191119
Operation operation =
1120-
parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
1120+
parse(
1121+
"alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 'k2' = 'v2', 'K2' = 'V1', 'K2' = 'V2')");
11211122
Map<String, String> expectedOptions = new HashMap<>();
11221123
expectedOptions.put("connector", "dummy");
11231124
expectedOptions.put("k", "v");
11241125
expectedOptions.put("k1", "v1");
1126+
expectedOptions.put("k2", "v2");
11251127
expectedOptions.put("K2", "V2");
11261128

11271129
assertAlterTableOptions(
11281130
operation,
11291131
expectedIdentifier,
11301132
expectedOptions,
1131-
Arrays.asList(TableChange.set("k1", "v1"), TableChange.set("K2", "V2")),
1132-
"ALTER TABLE IF EXISTS cat1.db1.tb1\n SET 'k1' = 'v1',\n SET 'K2' = 'V2'");
1133+
List.of(
1134+
TableChange.set("k1", "v1"),
1135+
TableChange.set("k2", "v2"),
1136+
TableChange.set("K2", "V2")),
1137+
"ALTER TABLE IF EXISTS cat1.db1.tb1\n SET 'k1' = 'v1',\n SET 'k2' = 'v2',\n SET 'K2' = 'V2'");
11331138

11341139
// test alter table reset
11351140
checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
11361141
operation = parse("alter table if exists cat1.db1.tb1 reset ('k')");
11371142
assertAlterTableOptions(
11381143
operation,
11391144
expectedIdentifier,
1140-
Collections.singletonMap("connector", "dummy"),
1141-
Collections.singletonList(TableChange.reset("k")),
1145+
Map.of("connector", "dummy"),
1146+
List.of(TableChange.reset("k")),
11421147
"ALTER TABLE IF EXISTS cat1.db1.tb1\n RESET 'k'");
11431148
assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ('connector')"))
11441149
.isInstanceOf(ValidationException.class)
@@ -2885,7 +2890,8 @@ private void assertAlterTableOptions(
28852890
assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
28862891
assertThat(alterTableOptionsOperation.getNewTable().getOptions())
28872892
.isEqualTo(expectedOptions);
2888-
assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
2893+
assertThat(expectedChanges)
2894+
.containsExactlyInAnyOrderElementsOf(alterTableOptionsOperation.getTableChanges());
28892895
assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
28902896
}
28912897

0 commit comments

Comments
 (0)