From e1e0ed0b97ad6c85ed531d0cb4f9d18e0ba61121 Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Tue, 9 Dec 2025 05:16:40 -0800 Subject: [PATCH 01/12] Fix Iceberg duplicate field id in converted schema --- .../iceberg/IcebergSchemaExtractor.java | 15 ++++-- .../iceberg/TestIcebergSchemaExtractor.java | 51 +++++++++++++++++++ 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 4366bc025..c9163ec5c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -146,10 +146,14 @@ private List convertFields( List ids = schema.getFields().stream() .map( - field -> - field.getFieldId() == null - ? fieldIdTracker.incrementAndGet() - : field.getFieldId()) + field -> { + int id = + field.getFieldId() == null + ? fieldIdTracker.incrementAndGet() + : field.getFieldId(); + fieldIdTracker.accumulateAndGet(id, Math::max); + return id; + }) .collect(CustomCollectors.toList(schema.getFields().size())); List nestedFields = new ArrayList<>(schema.getFields().size()); for (int i = 0; i < schema.getFields().size(); i++) { @@ -218,8 +222,10 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { .findFirst() .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); int keyId = key.getFieldId() == null ? fieldIdTracker.incrementAndGet() : key.getFieldId(); + fieldIdTracker.accumulateAndGet(keyId, Math::max); int valueId = value.getFieldId() == null ? fieldIdTracker.incrementAndGet() : value.getFieldId(); + fieldIdTracker.accumulateAndGet(valueId, Math::max); if (field.getSchema().isNullable()) { return Types.MapType.ofOptional( keyId, @@ -244,6 +250,7 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { .orElseThrow(() -> new SchemaExtractorException("Invalid array schema")); int elementId = element.getFieldId() == null ? fieldIdTracker.incrementAndGet() : element.getFieldId(); + fieldIdTracker.accumulateAndGet(elementId, Math::max); if (field.getSchema().isNullable()) { return Types.ListType.ofOptional(elementId, toIcebergType(element, fieldIdTracker)); } else { diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java index 824e22856..3c3d85a01 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java @@ -1044,4 +1044,55 @@ public void testToIcebergWithNoFieldIdsSet() { Assertions.assertTrue( icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema))); } + + @Test + public void testToIcebergWithPartialFieldIdsSet() { + InternalSchema internalSchema = + InternalSchema.builder() + .name("testRecord") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("name") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("scores") + .schema( + InternalSchema.builder() + .name("array") + .dataType(InternalType.LIST) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_element") + .parentPath("scores") + .schema( + InternalSchema.builder() + .name("long") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .fieldId(null) + .build())) + .build()) + .fieldId(2) + .build())) + .build(); + Schema icebergRepresentation = + new Schema( + Types.NestedField.optional(1, "name", Types.StringType.get()), + Types.NestedField.optional( + 2, "scores", Types.ListType.ofOptional(3, Types.LongType.get()))); + assertTrue(icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema))); + } } From 210ea6288ad5378ce2edbfe1dd5a91a0cfd5f41f Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Thu, 11 Dec 2025 04:09:26 -0800 Subject: [PATCH 02/12] Traverse the whole schema before converting, add test for the whole conversion process --- .../iceberg/IcebergSchemaExtractor.java | 60 +++++++++++++++---- .../apache/xtable/ITConversionController.java | 23 +++++++ .../apache/xtable/TestSparkDeltaTable.java | 18 +++++- .../apache/xtable/delta/TestDeltaHelper.java | 12 ++++ 4 files changed, 101 insertions(+), 12 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index c9163ec5c..a635dafa0 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -62,9 +62,54 @@ public static IcebergSchemaExtractor getInstance() { return INSTANCE; } + private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger fieldIdTracker) { + schema.getFields().stream() + .forEach( + field -> { + if (field.getFieldId() != null) + fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max); + initializeFieldIdTracker(field, fieldIdTracker); + }); + } + + private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldIdTracker) { + switch (field.getSchema().getDataType()) { + case RECORD: + initializeFieldIdTracker(field.getSchema(), fieldIdTracker); + return; + case MAP: + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()) + || InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .forEach( + mapField -> { + if (mapField.getFieldId() != null) + fieldIdTracker.accumulateAndGet(mapField.getFieldId(), Math::max); + initializeFieldIdTracker(mapField, fieldIdTracker); + }); + return; + case LIST: + field.getSchema().getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(arrayField.getName())) + .forEach( + arrayField -> { + if (arrayField.getFieldId() != null) + fieldIdTracker.accumulateAndGet(arrayField.getFieldId(), Math::max); + initializeFieldIdTracker(arrayField, fieldIdTracker); + }); + } + } + public Schema toIceberg(InternalSchema internalSchema) { // if field IDs are not assigned in the source, just use an incrementing integer AtomicInteger fieldIdTracker = new AtomicInteger(0); + // traverse the schema before converting it to make sure fieldIdTracker won't return any + // fieldIds used in the schema + initializeFieldIdTracker(internalSchema, fieldIdTracker); List nestedFields = convertFields(internalSchema, fieldIdTracker); List recordKeyFields = internalSchema.getRecordKeyFields(); boolean recordKeyFieldsAreNotRequired = @@ -146,14 +191,10 @@ private List convertFields( List ids = schema.getFields().stream() .map( - field -> { - int id = - field.getFieldId() == null - ? fieldIdTracker.incrementAndGet() - : field.getFieldId(); - fieldIdTracker.accumulateAndGet(id, Math::max); - return id; - }) + field -> + field.getFieldId() == null + ? fieldIdTracker.incrementAndGet() + : field.getFieldId()) .collect(CustomCollectors.toList(schema.getFields().size())); List nestedFields = new ArrayList<>(schema.getFields().size()); for (int i = 0; i < schema.getFields().size(); i++) { @@ -222,10 +263,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { .findFirst() .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); int keyId = key.getFieldId() == null ? fieldIdTracker.incrementAndGet() : key.getFieldId(); - fieldIdTracker.accumulateAndGet(keyId, Math::max); int valueId = value.getFieldId() == null ? fieldIdTracker.incrementAndGet() : value.getFieldId(); - fieldIdTracker.accumulateAndGet(valueId, Math::max); if (field.getSchema().isNullable()) { return Types.MapType.ofOptional( keyId, @@ -250,7 +289,6 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { .orElseThrow(() -> new SchemaExtractorException("Invalid array schema")); int elementId = element.getFieldId() == null ? fieldIdTracker.incrementAndGet() : element.getFieldId(); - fieldIdTracker.accumulateAndGet(elementId, Math::max); if (field.getSchema().isNullable()) { return Types.ListType.ofOptional(elementId, toIcebergType(element, fieldIdTracker)); } else { diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index b8ea413bb..959d1f01d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -746,6 +746,29 @@ public void testIcebergCorruptedSnapshotRecovery() throws Exception { } } + @Test + public void testColumnMappingEnabledDeltaToIceberg() { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(DELTA); + try (TestSparkDeltaTable table = + TestSparkDeltaTable.forColumnMappingEnabled(tableName, tempDir, sparkSession, null)) { + table.insertRows(20); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + ConversionConfig conversionConfig = + getTableSyncConfig( + DELTA, + SyncMode.INCREMENTAL, + tableName, + table, + Collections.singletonList(ICEBERG), + null, + null); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 20); + } + } + @Test public void testMetadataRetention() throws Exception { String tableName = getTableName(); diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index 028eca1bb..24b46f4d8 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -69,12 +69,27 @@ public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning( return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true); } + public static TestSparkDeltaTable forColumnMappingEnabled( + String tableName, Path tempDir, SparkSession sparkSession, String partitionField) { + return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, false, true); + } + public TestSparkDeltaTable( String name, Path tempDir, SparkSession sparkSession, String partitionField, boolean includeAdditionalColumns) { + this(name, tempDir, sparkSession, partitionField, includeAdditionalColumns, false); + } + + public TestSparkDeltaTable( + String name, + Path tempDir, + SparkSession sparkSession, + String partitionField, + boolean includeAdditionalColumns, + boolean enableColumnMapping) { try { this.tableName = name; this.basePath = initBasePath(tempDir, tableName); @@ -82,7 +97,8 @@ public TestSparkDeltaTable( this.partitionField = partitionField; this.includeAdditionalColumns = includeAdditionalColumns; this.testDeltaHelper = - TestDeltaHelper.createTestDataHelper(partitionField, includeAdditionalColumns); + TestDeltaHelper.createTestDataHelper( + partitionField, includeAdditionalColumns, enableColumnMapping); testDeltaHelper.createTable(sparkSession, tableName, basePath); this.deltaLog = DeltaLog.forTable(sparkSession, basePath); this.deltaTable = DeltaTable.forPath(sparkSession, basePath); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java index a677f57ee..cacce9506 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java @@ -117,14 +117,21 @@ public class TestDeltaHelper { StructType tableStructSchema; String partitionField; boolean includeAdditionalColumns; + boolean enableColumnMapping; public static TestDeltaHelper createTestDataHelper( String partitionField, boolean includeAdditionalColumns) { + return createTestDataHelper(partitionField, includeAdditionalColumns, false); + } + + public static TestDeltaHelper createTestDataHelper( + String partitionField, boolean includeAdditionalColumns, boolean enableColumnMapping) { StructType tableSchema = generateDynamicSchema(partitionField, includeAdditionalColumns); return TestDeltaHelper.builder() .tableStructSchema(tableSchema) .partitionField(partitionField) .includeAdditionalColumns(includeAdditionalColumns) + .enableColumnMapping(enableColumnMapping) .build(); } @@ -160,6 +167,11 @@ public void createTable(SparkSession sparkSession, String tableName, String base if (includeAdditionalColumns) { tableBuilder.addColumn("street", StringType); } + if (enableColumnMapping) { + tableBuilder.property("delta.minReaderVersion", "2"); + tableBuilder.property("delta.minWriterVersion", "5"); + tableBuilder.property("delta.columnMapping.mode", "name"); + } tableBuilder.execute(); } From 4526881a7923ba6792a8f3baf722848a424ddef0 Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Mon, 15 Dec 2025 01:01:25 -0800 Subject: [PATCH 03/12] update name mapping in the target Iceberg to read values from correct parquet columns --- .../xtable/model/schema/InternalField.java | 1 + .../xtable/delta/DeltaSchemaExtractor.java | 6 ++++ .../iceberg/IcebergConversionTarget.java | 34 +++++++++++++++++++ .../iceberg/IcebergSchemaExtractor.java | 5 +++ .../apache/xtable/TestSparkDeltaTable.java | 2 +- 5 files changed, 47 insertions(+), 1 deletion(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 31eb0ed41..23686ad4b 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,6 +43,7 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; + @Getter String storageName; // represents the fully qualified path to the field (dot separated) @Getter(lazy = true) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index 1376f884e..5119f5f5c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -55,6 +55,7 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; + private static final String DELTA_COLUMN_MAPPING_NAME = "delta.columnMapping.physicalName"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); // Timestamps in Delta are microsecond precision by default private static final Map @@ -136,6 +137,10 @@ private InternalSchema toInternalSchema( field.metadata().contains(DELTA_COLUMN_MAPPING_ID) ? (int) field.metadata().getLong(DELTA_COLUMN_MAPPING_ID) : null; + String storageName = + field.metadata().contains(DELTA_COLUMN_MAPPING_NAME) + ? field.metadata().getString(DELTA_COLUMN_MAPPING_NAME) + : null; String fieldComment = field.getComment().isDefined() ? field.getComment().get() : null; InternalSchema schema = @@ -148,6 +153,7 @@ private InternalSchema toInternalSchema( return InternalField.builder() .name(field.name()) .fieldId(fieldId) + .storageName(storageName) .parentPath(parentPath) .schema(schema) .defaultValue( diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index b05089d0a..8761aa238 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -20,9 +20,12 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import lombok.extern.log4j.Log4j2; @@ -39,6 +42,11 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; @@ -161,9 +169,35 @@ private void initializeTableIfRequired(InternalTable internalTable) { } } + private MappedFields updateNameMapping(MappedFields mapping, Map updates) { + if (mapping == null) { + return null; + } + List fieldResults = new ArrayList<>(); + for (MappedField field : mapping.fields()) { + Set fieldNames = new HashSet<>(field.names()); + if (updates.containsKey(field.id())) { + fieldNames.add(updates.get(field.id())); + } + MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), updates); + fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping)); + } + return MappedFields.of(fieldResults); + } + @Override public void syncSchema(InternalSchema schema) { Schema latestSchema = schemaExtractor.toIceberg(schema); + if (!schemaExtractor.getIdToStorageName().isEmpty()) { + NameMapping mapping = MappingUtil.create(latestSchema); + NameMapping updatedMapping = + NameMapping.of( + updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName())); + transaction + .updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(updatedMapping)) + .commit(); + } if (!transaction.table().schema().sameSchema(latestSchema)) { boolean hasFieldIds = schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index a635dafa0..7d4a6ebae 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import lombok.AccessLevel; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -57,6 +58,7 @@ public class IcebergSchemaExtractor { private static final String MAP_KEY_FIELD_NAME = "key"; private static final String MAP_VALUE_FIELD_NAME = "value"; private static final String LIST_ELEMENT_FIELD_NAME = "element"; + @Getter private final Map idToStorageName = new HashMap<>(); public static IcebergSchemaExtractor getInstance() { return INSTANCE; @@ -199,6 +201,9 @@ private List convertFields( List nestedFields = new ArrayList<>(schema.getFields().size()); for (int i = 0; i < schema.getFields().size(); i++) { InternalField field = schema.getFields().get(i); + if (field.getStorageName() != null) { + idToStorageName.put(ids.get(i), field.getStorageName()); + } nestedFields.add( Types.NestedField.of( ids.get(i), diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index 24b46f4d8..35787d84a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -71,7 +71,7 @@ public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning( public static TestSparkDeltaTable forColumnMappingEnabled( String tableName, Path tempDir, SparkSession sparkSession, String partitionField) { - return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, false, true); + return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true, true); } public TestSparkDeltaTable( From 2d9268890a64bbb767f1ef4c1a7f41b68b051a6c Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Tue, 16 Dec 2025 22:04:08 -0800 Subject: [PATCH 04/12] Add comments for physical field name and add more fields for schema extraction test --- .../xtable/model/schema/InternalField.java | 1 + .../iceberg/TestIcebergSchemaExtractor.java | 97 ++++++++++++++++++- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 23686ad4b..3e4ec4d11 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,6 +43,7 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; + // The name of the column in the data file used to store this field, if name mapping is enabled @Getter String storageName; // represents the fully qualified path to the field (dot separated) diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java index 3c3d85a01..f842f99c8 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java @@ -1066,6 +1066,7 @@ public void testToIcebergWithPartialFieldIdsSet() { .build(), InternalField.builder() .name("scores") + .fieldId(2) .schema( InternalSchema.builder() .name("array") @@ -1085,14 +1086,106 @@ public void testToIcebergWithPartialFieldIdsSet() { .fieldId(null) .build())) .build()) - .fieldId(2) + .build(), + InternalField.builder() + .name("record_map") + .fieldId(3) + .schema( + InternalSchema.builder() + .name("map") + .dataType(InternalType.MAP) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key") + .parentPath("record_map") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("_one_field_value") + .parentPath("record_map") + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("nested_int") + .fieldId(5) + .parentPath( + "record_map._one_field_value") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("primitive_map") + .fieldId(4) + .schema( + InternalSchema.builder() + .name("map") + .dataType(InternalType.MAP) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key") + .parentPath("primitive_map") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("_one_field_value") + .parentPath("primitive_map") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build()) .build())) .build(); Schema icebergRepresentation = new Schema( Types.NestedField.optional(1, "name", Types.StringType.get()), Types.NestedField.optional( - 2, "scores", Types.ListType.ofOptional(3, Types.LongType.get()))); + 2, "scores", Types.ListType.ofOptional(6, Types.LongType.get())), + Types.NestedField.optional( + 3, + "record_map", + Types.MapType.ofOptional( + 7, + 8, + Types.StringType.get(), + Types.StructType.of( + Arrays.asList( + Types.NestedField.optional( + 5, "nested_int", Types.IntegerType.get()))))), + Types.NestedField.required( + 4, + "primitive_map", + Types.MapType.ofRequired(9, 10, Types.StringType.get(), Types.IntegerType.get()))); assertTrue(icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema))); } } From fc763a9517d5d86beeee05fe2021b22a71ca0377 Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Wed, 17 Dec 2025 17:44:33 -0800 Subject: [PATCH 05/12] reset id to name mapping before extraction --- .../java/org/apache/xtable/model/schema/InternalField.java | 3 ++- .../org/apache/xtable/iceberg/IcebergSchemaExtractor.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 3e4ec4d11..32d4da262 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,7 +43,8 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; - // The name of the column in the data file used to store this field, if name mapping is enabled + // The name of the column in the data file used to store this field when it differs from the name + // in table's definition @Getter String storageName; // represents the fully qualified path to the field (dot separated) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 7d4a6ebae..1b13a2983 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -112,6 +112,10 @@ public Schema toIceberg(InternalSchema internalSchema) { // traverse the schema before converting it to make sure fieldIdTracker won't return any // fieldIds used in the schema initializeFieldIdTracker(internalSchema, fieldIdTracker); + // since IcebergSchemaExtractor is used as a singleton, idToStorageName may contain the results + // extracted in the last run. To reflect the latest schema, it should be reset before the schema + // extraction. + idToStorageName.clear(); List nestedFields = convertFields(internalSchema, fieldIdTracker); List recordKeyFields = internalSchema.getRecordKeyFields(); boolean recordKeyFieldsAreNotRequired = From 6526ab4a2d05aef42c14fdb80023eca02ab65afe Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Thu, 18 Dec 2025 16:05:03 -0800 Subject: [PATCH 06/12] Update name mapping in one place --- .../xtable/iceberg/IcebergConversionTarget.java | 16 ++++++++++------ .../xtable/iceberg/IcebergTableManager.java | 12 ++---------- .../xtable/iceberg/TestIcebergTableManager.java | 17 ++--------------- 3 files changed, 14 insertions(+), 31 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index 8761aa238..d3094f752 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -188,16 +188,20 @@ private MappedFields updateNameMapping(MappedFields mapping, Map field.getFieldId() != null); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java index 06b625c03..19f162a63 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java @@ -36,13 +36,10 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.mapping.MappingUtil; -import org.apache.iceberg.mapping.NameMappingParser; @AllArgsConstructor(staticName = "of") @Log4j2 @@ -88,14 +85,14 @@ Table getOrCreateTable( new Schema(), PartitionSpec.unpartitioned(), basePath, - getDefaultMappingProperties(schema))) + Collections.emptyMap())) .orElseGet( () -> getHadoopTables() .create( new Schema(), PartitionSpec.unpartitioned(), - getDefaultMappingProperties(schema), + Collections.emptyMap(), basePath)); // set the schema with the provided field IDs TableOperations operations = ((BaseTable) tableWithEmptySchema).operations(); @@ -112,11 +109,6 @@ Table getOrCreateTable( } } - private Map getDefaultMappingProperties(Schema schema) { - return Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(MappingUtil.create(schema))); - } - private Optional getCatalog(IcebergCatalogConfig catalogConfig) { if (catalogConfig == null) { return Optional.empty(); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java index f424e3a96..413a16d29 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java @@ -40,12 +40,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.mapping.MappingUtil; -import org.apache.iceberg.mapping.NameMappingParser; public class TestIcebergTableManager { private static final String BASE_PATH = "file:///basePath/"; @@ -117,10 +114,7 @@ void catalogGetOrCreateWithNewTable() { any(), eq(PartitionSpec.unpartitioned()), eq(BASE_PATH), - eq( - Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, - NameMappingParser.toJson(MappingUtil.create(schema)))))) + eq(Collections.emptyMap()))) .thenReturn(mockInitialTable); when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable); @@ -164,14 +158,7 @@ void catalogGetOrCreateWithRaceConditionOnCreation() { Schema schema = new Schema(); PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); when(mockCatalog.createTable( - eq(IDENTIFIER), - any(), - any(), - eq(BASE_PATH), - eq( - Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, - NameMappingParser.toJson(MappingUtil.create(schema)))))) + eq(IDENTIFIER), any(), any(), eq(BASE_PATH), eq(Collections.emptyMap()))) .thenThrow(new AlreadyExistsException("Table already exists")); when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable); From baf9d33bf8a6436ae2cfb8255117285600b41e5c Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Thu, 18 Dec 2025 17:06:01 -0800 Subject: [PATCH 07/12] update comments --- .../org/apache/xtable/iceberg/IcebergSchemaExtractor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 1b13a2983..1b2571897 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -109,8 +109,8 @@ private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldId public Schema toIceberg(InternalSchema internalSchema) { // if field IDs are not assigned in the source, just use an incrementing integer AtomicInteger fieldIdTracker = new AtomicInteger(0); - // traverse the schema before converting it to make sure fieldIdTracker won't return any - // fieldIds used in the schema + // traverse the schema before conversion to ensure fieldIdTracker won't return any + // fieldIds that are already present in the schema initializeFieldIdTracker(internalSchema, fieldIdTracker); // since IcebergSchemaExtractor is used as a singleton, idToStorageName may contain the results // extracted in the last run. To reflect the latest schema, it should be reset before the schema From 88ac8c09c697cfcf758cac4f8e728a9b59f0cd8c Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Fri, 19 Dec 2025 02:48:27 -0800 Subject: [PATCH 08/12] Create schema extractor per conversion, add test cases for schema evolution --- .../xtable/model/schema/InternalField.java | 4 ++-- .../iceberg/IcebergSchemaExtractor.java | 7 +------ .../apache/xtable/ITConversionController.java | 16 +++++++++++++- .../apache/xtable/TestSparkDeltaTable.java | 12 +++++++++++ .../apache/xtable/delta/TestDeltaHelper.java | 21 +++++++++++++++++++ 5 files changed, 51 insertions(+), 9 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 32d4da262..7f2097610 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,8 +43,8 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; - // The name of the column in the data file used to store this field when it differs from the name - // in table's definition + // The name of the column in the data file used to store this field if it differs from the name in + // the table's definition; otherwise, null @Getter String storageName; // represents the fully qualified path to the field (dot separated) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 1b2571897..4093f4ee2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -54,14 +54,13 @@ @Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class IcebergSchemaExtractor { - private static final IcebergSchemaExtractor INSTANCE = new IcebergSchemaExtractor(); private static final String MAP_KEY_FIELD_NAME = "key"; private static final String MAP_VALUE_FIELD_NAME = "value"; private static final String LIST_ELEMENT_FIELD_NAME = "element"; @Getter private final Map idToStorageName = new HashMap<>(); public static IcebergSchemaExtractor getInstance() { - return INSTANCE; + return new IcebergSchemaExtractor(); } private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger fieldIdTracker) { @@ -112,10 +111,6 @@ public Schema toIceberg(InternalSchema internalSchema) { // traverse the schema before conversion to ensure fieldIdTracker won't return any // fieldIds that are already present in the schema initializeFieldIdTracker(internalSchema, fieldIdTracker); - // since IcebergSchemaExtractor is used as a singleton, idToStorageName may contain the results - // extracted in the last run. To reflect the latest schema, it should be reset before the schema - // extraction. - idToStorageName.clear(); List nestedFields = convertFields(internalSchema, fieldIdTracker); List recordKeyFields = internalSchema.getRecordKeyFields(); boolean recordKeyFieldsAreNotRequired = diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 959d1f01d..4200e51fd 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -765,7 +765,21 @@ public void testColumnMappingEnabledDeltaToIceberg() { null, null); conversionController.sync(conversionConfig, conversionSourceProvider); - checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 20); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 40); + + table.dropColumn("long_field"); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 50); + + table.renameColumn("double_field", "scores"); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 60); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index 35787d84a..428aaf332 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -276,4 +276,16 @@ public List getColumnsToSelect() { .filter(columnName -> !columnName.equals("yearOfBirth")) .collect(Collectors.toList()); } + + public void dropColumn(String colName) { + testDeltaHelper.dropColumn(colName); + sparkSession.sql(String.format("ALTER TABLE delta.`%s` DROP COLUMN %s", basePath, colName)); + } + + public void renameColumn(String colName, String newColName) { + testDeltaHelper.renameColumn(colName, newColName); + sparkSession.sql( + String.format( + "ALTER TABLE delta.`%s` RENAME COLUMN %s TO %s", basePath, colName, newColName)); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java index cacce9506..bbebe7eae 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java @@ -315,4 +315,25 @@ public List generateRowsForSpecificPartition(int numRows, int partitionValu .mapToObj(i -> generateRandomRowForGivenYearAndLevel(partitionValue, level)) .collect(Collectors.toList()); } + + public void dropColumn(String colName) { + this.tableStructSchema = + new StructType( + Arrays.stream(tableStructSchema.fields()) + .filter(field -> field.name() != colName) + .toArray(StructField[]::new)); + } + + public void renameColumn(String colName, String newColName) { + this.tableStructSchema = + new StructType( + Arrays.stream(tableStructSchema.fields()) + .map( + field -> + field.name().equals(colName) + ? new StructField( + newColName, field.dataType(), field.nullable(), field.metadata()) + : field) + .toArray(StructField[]::new)); + } } From 07dd0d172be68a3c165e89ce14b9bf3403d5e9e1 Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Fri, 19 Dec 2025 11:55:26 -0800 Subject: [PATCH 09/12] Add adding columns case for schema evolution test --- .../apache/xtable/iceberg/IcebergConversionTarget.java | 10 ++++------ .../java/org/apache/xtable/ITConversionController.java | 5 +++++ .../java/org/apache/xtable/TestSparkDeltaTable.java | 4 ++++ .../java/org/apache/xtable/delta/TestDeltaHelper.java | 4 ++++ 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index d3094f752..f103cced7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -190,14 +190,12 @@ public void syncSchema(InternalSchema schema) { Schema latestSchema = schemaExtractor.toIceberg(schema); String mappingJson = transaction.table().properties().get(TableProperties.DEFAULT_NAME_MAPPING); NameMapping mapping = - mappingJson == null + mappingJson == null || !schemaExtractor.getIdToStorageName().isEmpty() ? MappingUtil.create(latestSchema) : NameMappingParser.fromJson(mappingJson); - if (!schemaExtractor.getIdToStorageName().isEmpty()) { - mapping = - NameMapping.of( - updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName())); - } + mapping = + NameMapping.of( + updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName())); transaction .updateProperties() .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping)) diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 4200e51fd..019f69ea0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -780,6 +780,11 @@ public void testColumnMappingEnabledDeltaToIceberg() { table.insertRows(10); conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 60); + + table.addColumn(); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 70); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index 428aaf332..743206aa3 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -288,4 +288,8 @@ public void renameColumn(String colName, String newColName) { String.format( "ALTER TABLE delta.`%s` RENAME COLUMN %s TO %s", basePath, colName, newColName)); } + + public void addColumn() { + testDeltaHelper.addColumn(); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java index bbebe7eae..2e95e89b5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java @@ -336,4 +336,8 @@ public void renameColumn(String colName, String newColName) { : field) .toArray(StructField[]::new)); } + + public void addColumn() { + this.tableStructSchema = tableStructSchema.add("city", StringType, true); + } } From 2368c5a533addf28ad53aaf16769eecf8006738b Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Sat, 20 Dec 2025 11:23:00 -0800 Subject: [PATCH 10/12] recreate the name mapping when source schema contains field IDs --- .../apache/xtable/iceberg/IcebergConversionTarget.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index f103cced7..3409d2775 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -189,8 +189,12 @@ private MappedFields updateNameMapping(MappedFields mapping, Map field.getFieldId() != null); + // Recreate name mapping when field IDs were provided in the source schema to ensure every + // field in the mapping was assigned the same ID as what is in the source schema NameMapping mapping = - mappingJson == null || !schemaExtractor.getIdToStorageName().isEmpty() + mappingJson == null || hasFieldIds ? MappingUtil.create(latestSchema) : NameMappingParser.fromJson(mappingJson); mapping = @@ -201,8 +205,6 @@ public void syncSchema(InternalSchema schema) { .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping)) .commit(); if (!transaction.table().schema().sameSchema(latestSchema)) { - boolean hasFieldIds = - schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null); if (hasFieldIds) { // There is no clean way to sync the schema with the provided field IDs using the // transaction API so we commit the current transaction and interact directly with From a8863bc59b61430feb2890d1d6083e1bd4c52678 Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Mon, 22 Dec 2025 00:00:23 -0800 Subject: [PATCH 11/12] Wrap single line if statement, add default case for switch, update name mapping only when schema changed --- .../iceberg/IcebergConversionTarget.java | 44 ++++++++++--------- .../iceberg/IcebergSchemaExtractor.java | 12 +++-- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index 3409d2775..a386ea870 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -169,17 +169,29 @@ private void initializeTableIfRequired(InternalTable internalTable) { } } - private MappedFields updateNameMapping(MappedFields mapping, Map updates) { + private void setNameMapping(NameMapping mapping) { + MappedFields updatedMappedFields = + updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName()); + transaction + .updateProperties() + .set( + TableProperties.DEFAULT_NAME_MAPPING, + NameMappingParser.toJson(NameMapping.of(updatedMappedFields))) + .commit(); + } + + private MappedFields updateNameMapping( + MappedFields mapping, Map idToStorageName) { if (mapping == null) { return null; } List fieldResults = new ArrayList<>(); for (MappedField field : mapping.fields()) { Set fieldNames = new HashSet<>(field.names()); - if (updates.containsKey(field.id())) { - fieldNames.add(updates.get(field.id())); + if (idToStorageName.containsKey(field.id())) { + fieldNames.add(idToStorageName.get(field.id())); } - MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), updates); + MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), idToStorageName); fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping)); } return MappedFields.of(fieldResults); @@ -188,24 +200,16 @@ private MappedFields updateNameMapping(MappedFields mapping, Map field.getFieldId() != null); - // Recreate name mapping when field IDs were provided in the source schema to ensure every - // field in the mapping was assigned the same ID as what is in the source schema - NameMapping mapping = - mappingJson == null || hasFieldIds - ? MappingUtil.create(latestSchema) - : NameMappingParser.fromJson(mappingJson); - mapping = - NameMapping.of( - updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName())); - transaction - .updateProperties() - .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping)) - .commit(); + if (!transaction.table().properties().containsKey(TableProperties.DEFAULT_NAME_MAPPING)) { + setNameMapping(MappingUtil.create(latestSchema)); + } if (!transaction.table().schema().sameSchema(latestSchema)) { + boolean hasFieldIds = + schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null); if (hasFieldIds) { + // Recreate name mapping when field IDs were provided in the source schema to ensure every + // field in the mapping was assigned the same ID as what is in the source schema + setNameMapping(MappingUtil.create(latestSchema)); // There is no clean way to sync the schema with the provided field IDs using the // transaction API so we commit the current transaction and interact directly with // the operations API. diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 4093f4ee2..3ae415617 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -67,8 +67,9 @@ private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger field schema.getFields().stream() .forEach( field -> { - if (field.getFieldId() != null) + if (field.getFieldId() != null) { fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max); + } initializeFieldIdTracker(field, fieldIdTracker); }); } @@ -86,8 +87,9 @@ private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldId || InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) .forEach( mapField -> { - if (mapField.getFieldId() != null) + if (mapField.getFieldId() != null) { fieldIdTracker.accumulateAndGet(mapField.getFieldId(), Math::max); + } initializeFieldIdTracker(mapField, fieldIdTracker); }); return; @@ -98,10 +100,14 @@ private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldId InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(arrayField.getName())) .forEach( arrayField -> { - if (arrayField.getFieldId() != null) + if (arrayField.getFieldId() != null) { fieldIdTracker.accumulateAndGet(arrayField.getFieldId(), Math::max); + } initializeFieldIdTracker(arrayField, fieldIdTracker); }); + return; + default: + return; } } From 72c6dd148bd862e5b1db7b53bb415b7fefc48637 Mon Sep 17 00:00:00 2001 From: xr-chen <56777910+xr-chen@users.noreply.github.com> Date: Mon, 22 Dec 2025 00:47:41 -0800 Subject: [PATCH 12/12] update comments --- .../org/apache/xtable/iceberg/IcebergConversionTarget.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index a386ea870..3dbaec768 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -207,8 +207,8 @@ public void syncSchema(InternalSchema schema) { boolean hasFieldIds = schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null); if (hasFieldIds) { - // Recreate name mapping when field IDs were provided in the source schema to ensure every - // field in the mapping was assigned the same ID as what is in the source schema + // If field IDs are provided in the source schema, manually update the name mapping to + // ensure the IDs match the correct fields. setNameMapping(MappingUtil.create(latestSchema)); // There is no clean way to sync the schema with the provided field IDs using the // transaction API so we commit the current transaction and interact directly with