Skip to content

Commit 617244f

Browse files
authored
[FLINK-38738][table] Expose more context in table connectors to correct UID
This closes #27352.
1 parent 4cb44c0 commit 617244f

File tree

6 files changed

+52
-3
lines changed

6 files changed

+52
-3
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ public interface ProviderContext {
6161
*/
6262
Optional<String> generateUid(String name);
6363

64+
/**
65+
* Returns the framework's node type in which this connector is embedded.
66+
*
67+
* <p>In other words: It returns the ExecNode's name and version as contained in the compiled
68+
* plan. For example, "stream-exec-table-source-scan_2" or "stream-exec-sink_1".
69+
*/
70+
default String getContainerNodeType() {
71+
return "";
72+
}
73+
6474
/**
6575
* Returns the display name provided by the framework to label the connector.
6676
*

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,26 @@ public interface ExecNode<T> extends ExecNodeTranslator<T>, FusionCodegenExecNod
6565
@JsonProperty(value = FIELD_NAME_ID, index = 0)
6666
int getId();
6767

68+
/**
69+
* The node's type as contained in {@link CompiledPlan} (e.g. "stream-exec-table-source-scan_2"
70+
* consisting of name and version).
71+
*
72+
* <p>A new type including its version can be added by declaring a {@link ExecNodeMetadata}
73+
* annotation.
74+
*
75+
* @see ExecNodeContext#getTypeAsString()
76+
*/
77+
@JsonIgnore
78+
String getTypeAsString();
79+
6880
/**
6981
* The version of the node.
7082
*
7183
* <p>A new version can be added by declaring a {@link ExecNodeMetadata} annotation, potentially
7284
* by copying the old annotation. You can use this method to get the current compiled version
7385
* and execute version-specific logic accordingly.
86+
*
87+
* @see ExecNodeContext#getVersion()
7488
*/
7589
@JsonIgnore
7690
int getVersion();

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public final int getId() {
131131
return context.getId();
132132
}
133133

134+
@Override
135+
public final String getTypeAsString() {
136+
return context.getTypeAsString();
137+
}
138+
134139
@Override
135140
public final int getVersion() {
136141
return context.getVersion();
@@ -345,6 +350,11 @@ public Optional<String> generateUid(String name) {
345350
return Optional.empty();
346351
}
347352

353+
@Override
354+
public String getContainerNodeType() {
355+
return getTypeAsString();
356+
}
357+
348358
@Override
349359
public String getName() {
350360
return metadata.getName();

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,11 @@ public Optional<String> generateUid(String name) {
431431
return providerContext.generateUid(name);
432432
}
433433

434+
@Override
435+
public String getContainerNodeType() {
436+
return providerContext.getContainerNodeType();
437+
}
438+
434439
@Override
435440
public String getName() {
436441
return providerContext.getName();

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public int getId() {
6666
return 0;
6767
}
6868

69+
@Override
70+
public String getTypeAsString() {
71+
return "";
72+
}
73+
6974
@Override
7075
public int getVersion() {
7176
return 0;

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,21 +413,22 @@ private static CompiledPlan planFromFlink1_15Values(TableEnvironment env) {
413413
}
414414

415415
private static CompiledPlan planFromFlink2_2MultiTransformSource(TableEnvironment env) {
416-
createMultiTransformSource(env);
416+
createMultiTransformSource(env, "stream-exec-table-source-scan_1");
417417
// plan content is compiled from
418418
// planFromCurrentFlinkMultiTransformSource() using Flink release-2.2
419419
return env.loadPlan(
420420
PlanReference.fromResource("/jsonplan/testMultiTransformSourceUidsFlink2_2.out"));
421421
}
422422

423423
private static CompiledPlan planFromCurrentFlinkMultiTransformSource(TableEnvironment env) {
424-
createMultiTransformSource(env);
424+
createMultiTransformSource(env, "stream-exec-table-source-scan_2");
425425
return env.from("T")
426426
.insertInto(TableDescriptor.forConnector("blackhole").build())
427427
.compilePlan();
428428
}
429429

430-
private static void createMultiTransformSource(TableEnvironment env) {
430+
private static void createMultiTransformSource(
431+
TableEnvironment env, String expectedSourceExecNode) {
431432
final DataStreamScanProvider scanProvider =
432433
new DataStreamScanProvider() {
433434
@Override
@@ -438,6 +439,10 @@ public boolean isBounded() {
438439
@Override
439440
public DataStream<RowData> produceDataStream(
440441
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
442+
443+
assertThat(providerContext.getContainerNodeType())
444+
.isEqualTo(expectedSourceExecNode);
445+
441446
// UID 1
442447
final SingleOutputStreamOperator<Integer> ints = execEnv.fromData(1, 2, 3);
443448
providerContext.generateUid("my-source").ifPresent(ints::uid);

0 commit comments

Comments
 (0)