diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 31eb0ed41..7f2097610 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,6 +43,9 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; + // The name of the column in the data file used to store this field if it differs from the name in + // the table's definition; otherwise, null + @Getter String storageName; // represents the fully qualified path to the field (dot separated) @Getter(lazy = true) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index 1376f884e..5119f5f5c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -55,6 +55,7 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; + private static final String DELTA_COLUMN_MAPPING_NAME = "delta.columnMapping.physicalName"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); // Timestamps in Delta are microsecond precision by default private static final Map @@ -136,6 +137,10 @@ private InternalSchema toInternalSchema( field.metadata().contains(DELTA_COLUMN_MAPPING_ID) ? (int) field.metadata().getLong(DELTA_COLUMN_MAPPING_ID) : null; + String storageName = + field.metadata().contains(DELTA_COLUMN_MAPPING_NAME) + ? field.metadata().getString(DELTA_COLUMN_MAPPING_NAME) + : null; String fieldComment = field.getComment().isDefined() ? field.getComment().get() : null; InternalSchema schema = @@ -148,6 +153,7 @@ private InternalSchema toInternalSchema( return InternalField.builder() .name(field.name()) .fieldId(fieldId) + .storageName(storageName) .parentPath(parentPath) .schema(schema) .defaultValue( diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index b05089d0a..3dbaec768 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -20,9 +20,12 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import lombok.extern.log4j.Log4j2; @@ -39,6 +42,11 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; @@ -161,13 +169,47 @@ private void initializeTableIfRequired(InternalTable internalTable) { } } + private void setNameMapping(NameMapping mapping) { + MappedFields updatedMappedFields = + updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName()); + transaction + .updateProperties() + .set( + TableProperties.DEFAULT_NAME_MAPPING, + NameMappingParser.toJson(NameMapping.of(updatedMappedFields))) + .commit(); + } + + private MappedFields updateNameMapping( + MappedFields mapping, Map idToStorageName) { + if (mapping == null) { + return null; + } + List fieldResults = new ArrayList<>(); + for (MappedField field : mapping.fields()) { + Set fieldNames = new HashSet<>(field.names()); + if (idToStorageName.containsKey(field.id())) { + fieldNames.add(idToStorageName.get(field.id())); + } + MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), idToStorageName); + fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping)); + } + return MappedFields.of(fieldResults); + } + @Override public void syncSchema(InternalSchema schema) { Schema latestSchema = schemaExtractor.toIceberg(schema); + if (!transaction.table().properties().containsKey(TableProperties.DEFAULT_NAME_MAPPING)) { + setNameMapping(MappingUtil.create(latestSchema)); + } if (!transaction.table().schema().sameSchema(latestSchema)) { boolean hasFieldIds = schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null); if (hasFieldIds) { + // If field IDs are provided in the source schema, manually update the name mapping to + // ensure the IDs match the correct fields. + setNameMapping(MappingUtil.create(latestSchema)); // There is no clean way to sync the schema with the provided field IDs using the // transaction API so we commit the current transaction and interact directly with // the operations API. diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 4366bc025..3ae415617 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import lombok.AccessLevel; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -53,18 +54,69 @@ @Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class IcebergSchemaExtractor { - private static final IcebergSchemaExtractor INSTANCE = new IcebergSchemaExtractor(); private static final String MAP_KEY_FIELD_NAME = "key"; private static final String MAP_VALUE_FIELD_NAME = "value"; private static final String LIST_ELEMENT_FIELD_NAME = "element"; + @Getter private final Map idToStorageName = new HashMap<>(); public static IcebergSchemaExtractor getInstance() { - return INSTANCE; + return new IcebergSchemaExtractor(); + } + + private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger fieldIdTracker) { + schema.getFields().stream() + .forEach( + field -> { + if (field.getFieldId() != null) { + fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max); + } + initializeFieldIdTracker(field, fieldIdTracker); + }); + } + + private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldIdTracker) { + switch (field.getSchema().getDataType()) { + case RECORD: + initializeFieldIdTracker(field.getSchema(), fieldIdTracker); + return; + case MAP: + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()) + || InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .forEach( + mapField -> { + if (mapField.getFieldId() != null) { + fieldIdTracker.accumulateAndGet(mapField.getFieldId(), Math::max); + } + initializeFieldIdTracker(mapField, fieldIdTracker); + }); + return; + case LIST: + field.getSchema().getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(arrayField.getName())) + .forEach( + arrayField -> { + if (arrayField.getFieldId() != null) { + fieldIdTracker.accumulateAndGet(arrayField.getFieldId(), Math::max); + } + initializeFieldIdTracker(arrayField, fieldIdTracker); + }); + return; + default: + return; + } } public Schema toIceberg(InternalSchema internalSchema) { // if field IDs are not assigned in the source, just use an incrementing integer AtomicInteger fieldIdTracker = new AtomicInteger(0); + // traverse the schema before conversion to ensure fieldIdTracker won't return any + // fieldIds that are already present in the schema + initializeFieldIdTracker(internalSchema, fieldIdTracker); List nestedFields = convertFields(internalSchema, fieldIdTracker); List recordKeyFields = internalSchema.getRecordKeyFields(); boolean recordKeyFieldsAreNotRequired = @@ -154,6 +206,9 @@ private List convertFields( List nestedFields = new ArrayList<>(schema.getFields().size()); for (int i = 0; i < schema.getFields().size(); i++) { InternalField field = schema.getFields().get(i); + if (field.getStorageName() != null) { + idToStorageName.put(ids.get(i), field.getStorageName()); + } nestedFields.add( Types.NestedField.of( ids.get(i), diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java index 06b625c03..19f162a63 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java @@ -36,13 +36,10 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.mapping.MappingUtil; -import org.apache.iceberg.mapping.NameMappingParser; @AllArgsConstructor(staticName = "of") @Log4j2 @@ -88,14 +85,14 @@ Table getOrCreateTable( new Schema(), PartitionSpec.unpartitioned(), basePath, - getDefaultMappingProperties(schema))) + Collections.emptyMap())) .orElseGet( () -> getHadoopTables() .create( new Schema(), PartitionSpec.unpartitioned(), - getDefaultMappingProperties(schema), + Collections.emptyMap(), basePath)); // set the schema with the provided field IDs TableOperations operations = ((BaseTable) tableWithEmptySchema).operations(); @@ -112,11 +109,6 @@ Table getOrCreateTable( } } - private Map getDefaultMappingProperties(Schema schema) { - return Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(MappingUtil.create(schema))); - } - private Optional getCatalog(IcebergCatalogConfig catalogConfig) { if (catalogConfig == null) { return Optional.empty(); diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index b8ea413bb..019f69ea0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -746,6 +746,48 @@ public void testIcebergCorruptedSnapshotRecovery() throws Exception { } } + @Test + public void testColumnMappingEnabledDeltaToIceberg() { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(DELTA); + try (TestSparkDeltaTable table = + TestSparkDeltaTable.forColumnMappingEnabled(tableName, tempDir, sparkSession, null)) { + table.insertRows(20); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + ConversionConfig conversionConfig = + getTableSyncConfig( + DELTA, + SyncMode.INCREMENTAL, + tableName, + table, + Collections.singletonList(ICEBERG), + null, + null); + conversionController.sync(conversionConfig, conversionSourceProvider); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 40); + + table.dropColumn("long_field"); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 50); + + table.renameColumn("double_field", "scores"); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 60); + + table.addColumn(); + table.insertRows(10); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 70); + } + } + @Test public void testMetadataRetention() throws Exception { String tableName = getTableName(); diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index 028eca1bb..743206aa3 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -69,12 +69,27 @@ public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning( return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true); } + public static TestSparkDeltaTable forColumnMappingEnabled( + String tableName, Path tempDir, SparkSession sparkSession, String partitionField) { + return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true, true); + } + public TestSparkDeltaTable( String name, Path tempDir, SparkSession sparkSession, String partitionField, boolean includeAdditionalColumns) { + this(name, tempDir, sparkSession, partitionField, includeAdditionalColumns, false); + } + + public TestSparkDeltaTable( + String name, + Path tempDir, + SparkSession sparkSession, + String partitionField, + boolean includeAdditionalColumns, + boolean enableColumnMapping) { try { this.tableName = name; this.basePath = initBasePath(tempDir, tableName); @@ -82,7 +97,8 @@ public TestSparkDeltaTable( this.partitionField = partitionField; this.includeAdditionalColumns = includeAdditionalColumns; this.testDeltaHelper = - TestDeltaHelper.createTestDataHelper(partitionField, includeAdditionalColumns); + TestDeltaHelper.createTestDataHelper( + partitionField, includeAdditionalColumns, enableColumnMapping); testDeltaHelper.createTable(sparkSession, tableName, basePath); this.deltaLog = DeltaLog.forTable(sparkSession, basePath); this.deltaTable = DeltaTable.forPath(sparkSession, basePath); @@ -260,4 +276,20 @@ public List getColumnsToSelect() { .filter(columnName -> !columnName.equals("yearOfBirth")) .collect(Collectors.toList()); } + + public void dropColumn(String colName) { + testDeltaHelper.dropColumn(colName); + sparkSession.sql(String.format("ALTER TABLE delta.`%s` DROP COLUMN %s", basePath, colName)); + } + + public void renameColumn(String colName, String newColName) { + testDeltaHelper.renameColumn(colName, newColName); + sparkSession.sql( + String.format( + "ALTER TABLE delta.`%s` RENAME COLUMN %s TO %s", basePath, colName, newColName)); + } + + public void addColumn() { + testDeltaHelper.addColumn(); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java index a677f57ee..2e95e89b5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java @@ -117,14 +117,21 @@ public class TestDeltaHelper { StructType tableStructSchema; String partitionField; boolean includeAdditionalColumns; + boolean enableColumnMapping; public static TestDeltaHelper createTestDataHelper( String partitionField, boolean includeAdditionalColumns) { + return createTestDataHelper(partitionField, includeAdditionalColumns, false); + } + + public static TestDeltaHelper createTestDataHelper( + String partitionField, boolean includeAdditionalColumns, boolean enableColumnMapping) { StructType tableSchema = generateDynamicSchema(partitionField, includeAdditionalColumns); return TestDeltaHelper.builder() .tableStructSchema(tableSchema) .partitionField(partitionField) .includeAdditionalColumns(includeAdditionalColumns) + .enableColumnMapping(enableColumnMapping) .build(); } @@ -160,6 +167,11 @@ public void createTable(SparkSession sparkSession, String tableName, String base if (includeAdditionalColumns) { tableBuilder.addColumn("street", StringType); } + if (enableColumnMapping) { + tableBuilder.property("delta.minReaderVersion", "2"); + tableBuilder.property("delta.minWriterVersion", "5"); + tableBuilder.property("delta.columnMapping.mode", "name"); + } tableBuilder.execute(); } @@ -303,4 +315,29 @@ public List generateRowsForSpecificPartition(int numRows, int partitionValu .mapToObj(i -> generateRandomRowForGivenYearAndLevel(partitionValue, level)) .collect(Collectors.toList()); } + + public void dropColumn(String colName) { + this.tableStructSchema = + new StructType( + Arrays.stream(tableStructSchema.fields()) + .filter(field -> field.name() != colName) + .toArray(StructField[]::new)); + } + + public void renameColumn(String colName, String newColName) { + this.tableStructSchema = + new StructType( + Arrays.stream(tableStructSchema.fields()) + .map( + field -> + field.name().equals(colName) + ? new StructField( + newColName, field.dataType(), field.nullable(), field.metadata()) + : field) + .toArray(StructField[]::new)); + } + + public void addColumn() { + this.tableStructSchema = tableStructSchema.add("city", StringType, true); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java index 824e22856..f842f99c8 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java @@ -1044,4 +1044,148 @@ public void testToIcebergWithNoFieldIdsSet() { Assertions.assertTrue( icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema))); } + + @Test + public void testToIcebergWithPartialFieldIdsSet() { + InternalSchema internalSchema = + InternalSchema.builder() + .name("testRecord") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("name") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("scores") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("array") + .dataType(InternalType.LIST) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_element") + .parentPath("scores") + .schema( + InternalSchema.builder() + .name("long") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .fieldId(null) + .build())) + .build()) + .build(), + InternalField.builder() + .name("record_map") + .fieldId(3) + .schema( + InternalSchema.builder() + .name("map") + .dataType(InternalType.MAP) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key") + .parentPath("record_map") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("_one_field_value") + .parentPath("record_map") + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("nested_int") + .fieldId(5) + .parentPath( + "record_map._one_field_value") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("primitive_map") + .fieldId(4) + .schema( + InternalSchema.builder() + .name("map") + .dataType(InternalType.MAP) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key") + .parentPath("primitive_map") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("_one_field_value") + .parentPath("primitive_map") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build()) + .build())) + .build(); + Schema icebergRepresentation = + new Schema( + Types.NestedField.optional(1, "name", Types.StringType.get()), + Types.NestedField.optional( + 2, "scores", Types.ListType.ofOptional(6, Types.LongType.get())), + Types.NestedField.optional( + 3, + "record_map", + Types.MapType.ofOptional( + 7, + 8, + Types.StringType.get(), + Types.StructType.of( + Arrays.asList( + Types.NestedField.optional( + 5, "nested_int", Types.IntegerType.get()))))), + Types.NestedField.required( + 4, + "primitive_map", + Types.MapType.ofRequired(9, 10, Types.StringType.get(), Types.IntegerType.get()))); + assertTrue(icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema))); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java index f424e3a96..413a16d29 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java @@ -40,12 +40,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.mapping.MappingUtil; -import org.apache.iceberg.mapping.NameMappingParser; public class TestIcebergTableManager { private static final String BASE_PATH = "file:///basePath/"; @@ -117,10 +114,7 @@ void catalogGetOrCreateWithNewTable() { any(), eq(PartitionSpec.unpartitioned()), eq(BASE_PATH), - eq( - Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, - NameMappingParser.toJson(MappingUtil.create(schema)))))) + eq(Collections.emptyMap()))) .thenReturn(mockInitialTable); when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable); @@ -164,14 +158,7 @@ void catalogGetOrCreateWithRaceConditionOnCreation() { Schema schema = new Schema(); PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); when(mockCatalog.createTable( - eq(IDENTIFIER), - any(), - any(), - eq(BASE_PATH), - eq( - Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, - NameMappingParser.toJson(MappingUtil.create(schema)))))) + eq(IDENTIFIER), any(), any(), eq(BASE_PATH), eq(Collections.emptyMap()))) .thenThrow(new AlreadyExistsException("Table already exists")); when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable);