Skip to content

Conversation

@xr-chen
Copy link

@xr-chen xr-chen commented Dec 9, 2025

Important Read

What is the purpose of the pull request

Currently, a column mapping enabled Delta table with array/map columns can't be converted into an Iceberg table using xTable because

  1. Delta doesn't generate field IDs for elements within an array column, or key & value within a map column
  2. For elements in an array column or key & value in a map column, IDs generated by the variable fieIdIdTrack are already used in the schema, which violates the field ID requirements of Iceberg's NestFiled type.

For example, the schema of a Delta table with a string column name and an array column scores would look like:

{
    "type": "struct",
    "fields": [
        {
            "name": "name",
            "type": "string",
            "nullable": true,
            "metadata": {
                "delta.columnMapping.id": 1,
                "delta.columnMapping.physicalName": "name"
            }
        },
        {
            "name": "scores",
            "type": {
                "type": "array",
                "elementType": "long",
                "containsNull": true
            },
            "nullable": true,
            "metadata": {
                "delta.columnMapping.id": 2,
                "delta.columnMapping.physicalName": "scores"
            }
        }
    ]
}

In the above schema, there wasn't a delta.columnMapping.id for elements in the array column. Similarly, map columns don't have field IDs for their key value.

  "type": "struct",
  "fields": [
    {
      "name": "name",
      "type": "string",
      "nullable": true,
      "metadata": {
        "delta.columnMapping.id": 1,
        "delta.columnMapping.physicalName": "name"
      }
    },
    {
      "name": "properties",
      "type": {
        "type": "map",
        "keyType": "string",
        "valueType": "string",
        "valueContainsNull": true
      },
      "nullable": true,
      "metadata": {
        "delta.columnMapping.id": 2,
        "delta.columnMapping.physicalName": "properties"
      }
    }
  ]
}

Brief change log

Update the fieldIdTracker variable to be the latest field ID every time we get a field ID either from the source table schema or from fieldIdTracker itself, such that fieldIdTracker won't return any ID already in the source table schema.

Verify this pull request

  • Added testToIcebergWithPartialFieldIdsSet to verify the change.

@xr-chen xr-chen changed the title Fix duplicate field id in converted Iceberg schema from columnMapping enabled Delta table Fix Delta to Iceberg not working on column mapping enabled Delta source table Dec 10, 2025
@the-other-tim-brown
Copy link
Contributor

@xr-chen thank you for the bug report and contribution! There is a test case called ITConversionController that converts between the formats. Is it possible to update this test so it will trigger the same issue that you saw?

: field.getFieldId())
field -> {
int id =
field.getFieldId() == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fieldId is set, we need to use that ID. That is how we are able to handle renames in the schema. The reader will lookup the column based on the ID.

Copy link
Author

@xr-chen xr-chen Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still uses the ID in the schema, if the fieldId is set. Here, I just updated the fieldIdTracker such that it won't return any ID that was already used

.name("testRecord")
.dataType(InternalType.RECORD)
.isNullable(false)
.fields(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add another field that comes after the list to ensure the next field ID is chosen properly. For example, the field after the list is going to have ID 3. We want to make sure that this carries through to the Iceberg schema.

Let's make sure the Map case is also tested here to ensure there is no regression in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this field to be added come with an ID or not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will test the Map case, that's a good point

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map case and more fields after the list were added

@xr-chen
Copy link
Author

xr-chen commented Dec 10, 2025

Thanks @the-other-tim-brown for reviewing the changes. You mean adding a new test in that controller for column mapping enabled Delta tables, maybe kind of like this one but for Delta?

public void testIcebergCorruptedSnapshotRecovery() throws Exception {

@xr-chen
Copy link
Author

xr-chen commented Dec 11, 2025

The problem is actually more complicated than fixing the fieldId generation; the data file written by a column mapping enabled Delta table doesn't follow the table schema at all. We need to somehow map the columns in the data files to the columns in the schema.

root
 |-- col-4ee2e8c9-35f1-4868-8ff5-46d285fac4b2: integer (nullable = true)
 |-- col-89424a5c-1fbc-4f97-b9dc-c3bf90f9849e: string (nullable = true)
 |-- col-0c385cbd-8f94-4043-bb55-89f07a39a2bb: string (nullable = true)

mapping info in the delta table

[
 {
   "name": "id",
   "type": "integer",
   "nullable": false,
   "metadata": {
     "delta.columnMapping.id": 1,
     "delta.columnMapping.physicalName": "col-4ee2e8c9-35f1-4868-8ff5-46d285fac4b2"
   }
 },
 {
   "name": "firstName",
   "type": "string",
   "nullable": true,
   "metadata": {
     "delta.columnMapping.id": 2,
     "delta.columnMapping.physicalName": "col-89424a5c-1fbc-4f97-b9dc-c3bf90f9849e"
   }
 },
 {
   "name": "lastName",
   "type": "string",
   "nullable": true,
   "metadata": {
     "delta.columnMapping.id": 3,
     "delta.columnMapping.physicalName": "col-0c385cbd-8f94-4043-bb55-89f07a39a2bb"
   }
 }
]

}

@Test
public void testColumnMappingEnabledDeltaToIceberg() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen to answer your question, yes this is exactly what I was looking for.

Do you think we should also do some minor schema evolution in this case?

Copy link
Author

@xr-chen xr-chen Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown Yes, I think so, but the code actually can't pass this test case now, so it probably won't work on any rename column type of schema change. It seems to me that only populating fieldId doesn't work, and the converted Iceberg doesn't know which 'physical' column in the data file to read data from for a 'logic' column name in the table schema, and it returns null values for all columns if we read from the generated Iceberg table. This issue is probably due to:

  1. We don't extract the delta.columnMapping.physicalName from the Delta table's schema in , so we don't know where the column is actually stored
  2. In the converted Iceberg, it doesn't have a name mapping to recognize which Parquet column corresponds to a given Iceberg field ID

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added delta's physical column names into the Icerberg's schema.name-mapping.default, and the converted table could read data from the correct place now and could return the same content as the original delta table. But I got a weird issue during testing,

  • All tests could pass with this new test disabled when running mvn verify
  • This test could pass when running it independently
  • When running this test with all other tests together, ITConversionController.testVariousOperations will fail
    @the-other-tim-brown, is there anything shared among the test cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filesystem is shared between the tests along with the hadoop and spark configurations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, it's due to the idToStorageName field I added to the IcebergSchemaExtractor, the field wasn't reset before a new sync run, and the schema extractor was used as a singleton, so previous extraction results were carried over to the next run, which fails the test. Now the mvn clean verify passed on my end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on this, thanks for digging into the issue. I added a comment on that class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you have this working, should we add some schema evolution here?

I think we should at least add a second commit as a sanity check that everything works as expected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an existing function I can use to change the schema of the source delta table for testing, or I should implement it by myself?

By a second commit, you mean inserting more records by insertRows and syncing the table again to make sure it works in incremental sync mode as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have some helpers for this. Earlier in this test class you will see a test case that uses GenericTable.getInstanceWithAdditionalColumns. This has some helpers for creating the evolved Delta table schema under the hood that you should be able to build off of.

And yes, just inserting or updating some more records is fine. I just want to ensure there isn't some unexpected side-effect when we set this table property multiple times for Iceberg.

// The id field for the field. This is used to identify the field in the schema even after
// renames.
Integer fieldId;
@Getter String storageName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment for this field?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will add

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen "name mapping" is a delta specific concept. The comment should describe more generally what is happening here. Something like The name of the column in the data file used to store this field when it differs from the name in table's definition
The comment should also describe whether this will be null when the names are the same or if the string is expected to be populated

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen will this value be null if it is not different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add that detail to the comment?

}

private MappedFields updateNameMapping(MappedFields mapping, Map<Integer, String> updates) {
if (mapping == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can mapping ever be null? If null is returned will NameMapping.of not throw an exception?

Copy link
Author

@xr-chen xr-chen Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly used to end the recursive call; all nested fields of a given field were processed using this function as well. In that case, having null as a nested field actually makes sense. And for the mapping we want to update, I believe MappingUtil.create won't return us a null

public void syncSchema(InternalSchema schema) {
Schema latestSchema = schemaExtractor.toIceberg(schema);
if (!schemaExtractor.getIdToStorageName().isEmpty()) {
NameMapping mapping = MappingUtil.create(latestSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IcebergTableManager is also setting the default name mapping, should we remove that and just rely on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense; it's better to update the name mapping in a single place

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted moving the code for setting default name mapping in IcebergTableManager to here, but some test cases failed; there might be some reasons the map should be initialized there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no special reason. It is just there from when I first set this up. The mapping needs to be there to handle the case where the field ID is not set in the file schema as you have seen.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've moved the name mapping code to the schema sync function now

private static final String MAP_KEY_FIELD_NAME = "key";
private static final String MAP_VALUE_FIELD_NAME = "value";
private static final String LIST_ELEMENT_FIELD_NAME = "element";
@Getter private final Map<Integer, String> idToStorageName = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xr-chen This adds state to the class so we have to decide if we want to make an instance of this class per conversion or remove this state.

Removing the state would require you to return the map as part of the response for the toIceberg.

I don't have a strong opinion either way, but I would prefer that over the clear call in the toIceberg method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, this will avoid unexpected outcomes caused by adding states to a singleton object. I will update the code

conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(DELTA, table, Collections.singletonList(ICEBERG), 50);

table.renameColumn("double_field", "scores");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you sanity checked that the final delta table matches your expectations in terms of the schema and data files written?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final schema does match the expected results. I can't see the dropped column long_field anymore, the double_field column is renamed to scores, and a new column city is added
Screenshot 2025-12-21 at 4 07 05 PM
For the data files, they still use random strings as column names; nothing should be changed after Delta's schema evolution.
Screenshot 2025-12-21 at 4 07 41 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming!

}

private void initializeFieldIdTracker(InternalField field, AtomicInteger fieldIdTracker) {
switch (field.getSchema().getDataType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a default case for this switch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default case would be the type of fields that couldn't be assigned an ID, and in that case, it should just return from this function. So we want an explicit return statement in the default case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that is more clear. The linter is flagging this for me locally

|| InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
.forEach(
mapField -> {
if (mapField.getFieldId() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the if statements in this class, let's wrap the body with {} so it is more clear what is contained in the conditional

: NameMappingParser.fromJson(mappingJson);
mapping =
NameMapping.of(
updateNameMapping(mapping.asMappedFields(), schemaExtractor.getIdToStorageName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updateNameMapping takes in updates but it seems like it will take in all the idToStorageName values. Is that intentional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the variable name is confusing. Let me use a different name for this argument, how about idToStorageName or storageNameForId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idToStorageName sounds good to me

Comment on lines 194 to 199
// Recreate name mapping when field IDs were provided in the source schema to ensure every
// field in the mapping was assigned the same ID as what is in the source schema
NameMapping mapping =
mappingJson == null || hasFieldIds
? MappingUtil.create(latestSchema)
: NameMappingParser.fromJson(mappingJson);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to always create the name mapping based off of the latest schema?

If I remember correctly, this name mapping will always be useful if the field IDs are not persisted to the source data files so I think we'll always want an up to date mapping.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on whether the source table has field IDs in the schema and how we update the schema. For a source table that doesn't have field IDs in its schema, if I understand the code correctly, the current implementation will make updates using the schema update API. For example, a table without field IDs that has three columns of int, array, and string type, the generated Iceberg schema will look like

1 int
2 array
  4 - elements
3 string

If we add a double column to this table, with the schema update API, the next available integer will be used for the new column

1 int
2 array
  4 - elements
3 string
5 double

But if we recreate mapping from the converted schema with auto-assigned IDs in this case, the structure of the mapping will be:

1 int
2 array
 5 - elements
3 string
4 double

which doesn't align with the schema anymore, so I think we should only create the mapping using the latest schema, only if the source table uses field IDs to identify columns in the data file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always need to generate the name mapping though. If you don't update it and the schema changes, then you will be missing fields.

The ID assignment should happen when generating the latestSchema based on my reading of the code. Is that wrong?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember Iceberg updates the name mapping when this property is set and the schema is updated. I could verify that with a test case.

You are right, the ID assignment happens at that time, but Iceberg's schema update API just chooses the next available integer as the field ID for the new field instead of reading from the IDs in the latestSchema.

Copy link
Author

@xr-chen xr-chen Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tested, Iceberg will automatically update the name mapping as well when updating the schema with UpdateSchema, for example, when a new column city is added, the corresponding entry can also be found in the name mapping, schema

{
      "id" : 19,
      "name" : "street",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 29,
      "name" : "city",
      "required" : false,
      "type" : "string"
    }

name mapping

{
  "field-id" : 19,
  "names" : [ "street" ]
}, {
  "field-id" : 29,
  "names" : [ "city" ]
}

It's unnecessary to recreate the name mapping based on the latest schema in this case, and recreating might result in the name mapping not aligning with the table schema. For example, the field Id of city is supposed to be 20 in the latestSchema, if no field Ids are provided in the source table, the name mapping created from latestSchema will also map city to 20, but in the table schema, 20 is already assigned to one of the nested fields which is of course not the field city.

And when the schema is updated with the operations API, the name mapping will be recreated based on the latest schema to reflect the changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown I tried always creating the name mapping with the latest schema, but some tests in ITConversionController.testVariousOperations failed due to the mismatch between the schema and the name mapping when the schema is updated via UpdateSchema

.updateProperties()
.set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
.commit();
if (!transaction.table().schema().sameSchema(latestSchema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move the properties update to this block as well so we don't need to issue as many updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Delta table to Iceberg doesn't work on column mapping enabled Delta source table

3 participants