Skip to content

Commit 2bbeace

Browse files
committed
[FLINK-38807][state] Add state SQL metadata based type inference
1 parent 7ba9741 commit 2bbeace

File tree

11 files changed

+730
-333
lines changed

11 files changed

+730
-333
lines changed

docs/content/docs/libs/state_processor_api.md

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -586,13 +586,6 @@ public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, Voi
586586
}
587587
...
588588
}
589-
590-
public class AvroSavepointTypeInformationFactory implements SavepointTypeInformationFactory {
591-
@Override
592-
public TypeInformation<?> getTypeInformation() {
593-
return new AvroTypeInfo<>(AvroRecord.class);
594-
}
595-
}
596589
```
597590

598591
Then it can read by querying a table created using the following SQL statement:
@@ -609,8 +602,7 @@ CREATE TABLE state_table (
609602
'connector' = 'savepoint',
610603
'state.backend.type' = 'rocksdb',
611604
'state.path' = '/root/dir/of/checkpoint-data/chk-1',
612-
'operator.uid' = 'my-uid',
613-
'fields.MyAvroState.value-type-factory' = 'org.apache.flink.state.table.AvroSavepointTypeInformationFactory'
605+
'operator.uid' = 'my-uid'
614606
);
615607
```
616608

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,27 @@
1919
package org.apache.flink.state.api.runtime;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.core.fs.FSDataInputStream;
23+
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
2224
import org.apache.flink.runtime.checkpoint.Checkpoints;
25+
import org.apache.flink.runtime.checkpoint.OperatorState;
2326
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
2427
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
28+
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
29+
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
30+
import org.apache.flink.runtime.state.KeyedStateHandle;
31+
import org.apache.flink.runtime.state.StreamStateHandle;
2532
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
33+
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
34+
import org.apache.flink.state.api.OperatorIdentifier;
2635

2736
import java.io.DataInputStream;
2837
import java.io.IOException;
38+
import java.util.Map;
39+
import java.util.function.Function;
40+
import java.util.stream.Collectors;
2941

30-
/** Utility class for loading {@link CheckpointMetadata} metadata. */
42+
/** Utility class for loading savepoint metadata and operator state information. */
3143
@Internal
3244
public final class SavepointLoader {
3345
private SavepointLoader() {}
@@ -55,4 +67,70 @@ public static CheckpointMetadata loadSavepointMetadata(String savepointPath)
5567
stream, Thread.currentThread().getContextClassLoader(), savepointPath);
5668
}
5769
}
70+
71+
/**
72+
* Loads all state metadata for an operator in a single I/O operation.
73+
*
74+
* @param savepointPath Path to the savepoint directory
75+
* @param operatorIdentifier Operator UID or hash
76+
* @return Map from state name to StateMetaInfoSnapshot
77+
* @throws IOException If reading fails
78+
*/
79+
public static Map<String, StateMetaInfoSnapshot> loadOperatorStateMetadata(
80+
String savepointPath, OperatorIdentifier operatorIdentifier) throws IOException {
81+
82+
CheckpointMetadata checkpointMetadata = loadSavepointMetadata(savepointPath);
83+
84+
OperatorState operatorState =
85+
checkpointMetadata.getOperatorStates().stream()
86+
.filter(
87+
state ->
88+
operatorIdentifier
89+
.getOperatorId()
90+
.equals(state.getOperatorID()))
91+
.findFirst()
92+
.orElseThrow(
93+
() ->
94+
new IllegalArgumentException(
95+
"Operator "
96+
+ operatorIdentifier
97+
+ " not found in savepoint"));
98+
99+
KeyedStateHandle keyedStateHandle =
100+
operatorState.getStates().stream()
101+
.flatMap(s -> s.getManagedKeyedState().stream())
102+
.findFirst()
103+
.orElseThrow(
104+
() ->
105+
new IllegalArgumentException(
106+
"No keyed state found for operator "
107+
+ operatorIdentifier));
108+
109+
KeyedBackendSerializationProxy<?> proxy = readSerializationProxy(keyedStateHandle);
110+
return proxy.getStateMetaInfoSnapshots().stream()
111+
.collect(Collectors.toMap(StateMetaInfoSnapshot::getName, Function.identity()));
112+
}
113+
114+
private static KeyedBackendSerializationProxy<?> readSerializationProxy(
115+
KeyedStateHandle stateHandle) throws IOException {
116+
117+
StreamStateHandle streamStateHandle;
118+
if (stateHandle instanceof KeyGroupsStateHandle) {
119+
streamStateHandle = ((KeyGroupsStateHandle) stateHandle).getDelegateStateHandle();
120+
} else {
121+
throw new IllegalArgumentException(
122+
"Unsupported KeyedStateHandle type: " + stateHandle.getClass());
123+
}
124+
125+
try (FSDataInputStream inputStream = streamStateHandle.openInputStream()) {
126+
DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
127+
128+
KeyedBackendSerializationProxy<?> proxy =
129+
new KeyedBackendSerializationProxy<>(
130+
Thread.currentThread().getContextClassLoader());
131+
proxy.read(inputView);
132+
133+
return proxy;
134+
}
135+
}
58136
}

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.state.MapStateDescriptor;
2323
import org.apache.flink.api.common.state.ValueStateDescriptor;
2424
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.common.typeutils.TypeSerializer;
2526
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.configuration.StateBackendOptions;
@@ -91,32 +92,33 @@ public DataStream<RowData> produceDataStream(
9192

9293
// Get value state descriptors
9394
for (StateValueColumnConfiguration columnConfig : keyValueProjections.f1) {
94-
TypeInformation valueTypeInfo = columnConfig.getValueTypeInfo();
95+
TypeSerializer valueTypeSerializer = columnConfig.getValueTypeSerializer();
9596

9697
switch (columnConfig.getStateType()) {
9798
case VALUE:
9899
columnConfig.setStateDescriptor(
99100
new ValueStateDescriptor<>(
100-
columnConfig.getStateName(), valueTypeInfo));
101+
columnConfig.getStateName(), valueTypeSerializer));
101102
break;
102103

103104
case LIST:
104105
columnConfig.setStateDescriptor(
105106
new ListStateDescriptor<>(
106-
columnConfig.getStateName(), valueTypeInfo));
107+
columnConfig.getStateName(), valueTypeSerializer));
107108
break;
108109

109110
case MAP:
110-
TypeInformation<?> mapKeyTypeInfo = columnConfig.getMapKeyTypeInfo();
111-
if (mapKeyTypeInfo == null) {
111+
TypeSerializer<?> mapKeyTypeSerializer =
112+
columnConfig.getMapKeyTypeSerializer();
113+
if (mapKeyTypeSerializer == null) {
112114
throw new ConfigurationException(
113-
"Map key type information is required for map state");
115+
"Map key type serializer is required for map state");
114116
}
115117
columnConfig.setStateDescriptor(
116118
new MapStateDescriptor<>(
117119
columnConfig.getStateName(),
118-
mapKeyTypeInfo,
119-
valueTypeInfo));
120+
mapKeyTypeSerializer,
121+
valueTypeSerializer));
120122
break;
121123

122124
default:

0 commit comments

Comments
 (0)