Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions docs/content/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -586,13 +586,6 @@ public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, Voi
}
...
}

public class AvroSavepointTypeInformationFactory implements SavepointTypeInformationFactory {
Copy link
Contributor

@davidradl davidradl Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should deprecate classes rather than just remove, and give advice on how to move to the new way of doing things. I suppose this could be a back port consideration only - if we take this approach then a comment reminder would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecate from test code?🙂 The new way is that Flink finds this out automatically but the SavepointTypeInformationFactory approach remains as safety belt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi my bad. I did not look closely enough. If the SavepointTypeInformationFactory approach remains as safety belt, I suggest we keep the deleted tests that refer to this approach - I assume these tests would still work if there is no metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're working but the whole intention is is to infer types and make user's life easier. The SavepointTypeInformationFactory approach would worth to add a new test though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test coverage

@Override
public TypeInformation<?> getTypeInformation() {
return new AvroTypeInfo<>(AvroRecord.class);
}
}
```

Then it can read by querying a table created using the following SQL statement:
Expand All @@ -609,8 +602,7 @@ CREATE TABLE state_table (
'connector' = 'savepoint',
'state.backend.type' = 'rocksdb',
'state.path' = '/root/dir/of/checkpoint-data/chk-1',
'operator.uid' = 'my-uid',
'fields.MyAvroState.value-type-factory' = 'org.apache.flink.state.table.AvroSavepointTypeInformationFactory'
'operator.uid' = 'my-uid'
);
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,27 @@
package org.apache.flink.state.api.runtime;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.api.OperatorIdentifier;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/** Utility class for loading {@link CheckpointMetadata} metadata. */
/** Utility class for loading savepoint metadata and operator state information. */
@Internal
public final class SavepointLoader {
private SavepointLoader() {}
Expand Down Expand Up @@ -55,4 +67,70 @@ public static CheckpointMetadata loadSavepointMetadata(String savepointPath)
stream, Thread.currentThread().getContextClassLoader(), savepointPath);
}
}

/**
* Loads all state metadata for an operator in a single I/O operation.
*
* @param savepointPath Path to the savepoint directory
* @param operatorIdentifier Operator UID or hash
* @return Map from state name to StateMetaInfoSnapshot
* @throws IOException If reading fails
*/
public static Map<String, StateMetaInfoSnapshot> loadOperatorStateMetadata(
String savepointPath, OperatorIdentifier operatorIdentifier) throws IOException {

CheckpointMetadata checkpointMetadata = loadSavepointMetadata(savepointPath);

OperatorState operatorState =
checkpointMetadata.getOperatorStates().stream()
.filter(
state ->
operatorIdentifier
.getOperatorId()
.equals(state.getOperatorID()))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
"Operator "
+ operatorIdentifier
+ " not found in savepoint"));

KeyedStateHandle keyedStateHandle =
operatorState.getStates().stream()
.flatMap(s -> s.getManagedKeyedState().stream())
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
"No keyed state found for operator "
+ operatorIdentifier));

KeyedBackendSerializationProxy<?> proxy = readSerializationProxy(keyedStateHandle);
return proxy.getStateMetaInfoSnapshots().stream()
.collect(Collectors.toMap(StateMetaInfoSnapshot::getName, Function.identity()));
}

private static KeyedBackendSerializationProxy<?> readSerializationProxy(
KeyedStateHandle stateHandle) throws IOException {

StreamStateHandle streamStateHandle;
if (stateHandle instanceof KeyGroupsStateHandle) {
streamStateHandle = ((KeyGroupsStateHandle) stateHandle).getDelegateStateHandle();
} else {
throw new IllegalArgumentException(
"Unsupported KeyedStateHandle type: " + stateHandle.getClass());
}

try (FSDataInputStream inputStream = streamStateHandle.openInputStream()) {
DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);

KeyedBackendSerializationProxy<?> proxy =
new KeyedBackendSerializationProxy<>(
Thread.currentThread().getContextClassLoader());
proxy.read(inputView);

return proxy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
Expand Down Expand Up @@ -91,32 +92,33 @@ public DataStream<RowData> produceDataStream(

// Get value state descriptors
for (StateValueColumnConfiguration columnConfig : keyValueProjections.f1) {
TypeInformation valueTypeInfo = columnConfig.getValueTypeInfo();
TypeSerializer valueTypeSerializer = columnConfig.getValueTypeSerializer();

switch (columnConfig.getStateType()) {
case VALUE:
columnConfig.setStateDescriptor(
new ValueStateDescriptor<>(
columnConfig.getStateName(), valueTypeInfo));
columnConfig.getStateName(), valueTypeSerializer));
break;

case LIST:
columnConfig.setStateDescriptor(
new ListStateDescriptor<>(
columnConfig.getStateName(), valueTypeInfo));
columnConfig.getStateName(), valueTypeSerializer));
break;

case MAP:
TypeInformation<?> mapKeyTypeInfo = columnConfig.getMapKeyTypeInfo();
if (mapKeyTypeInfo == null) {
TypeSerializer<?> mapKeyTypeSerializer =
columnConfig.getMapKeyTypeSerializer();
if (mapKeyTypeSerializer == null) {
throw new ConfigurationException(
"Map key type information is required for map state");
"Map key type serializer is required for map state");
}
columnConfig.setStateDescriptor(
new MapStateDescriptor<>(
columnConfig.getStateName(),
mapKeyTypeInfo,
valueTypeInfo));
mapKeyTypeSerializer,
valueTypeSerializer));
break;

default:
Expand Down
Loading