Skip to content

Commit 8ed913b

Browse files
authored
Read partitioned tables with source field missing (#2367)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Following with the Java [solution](https://github.com/apache/iceberg/pull/11868/files) implementation on how to read partition specs when a source field was dropped. # Are these changes tested? Yes, added one integration tests, and one unit test # Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent e3070d4 commit 8ed913b

File tree

3 files changed

+60
-4
lines changed

3 files changed

+60
-4
lines changed

pyiceberg/partitioning.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
TimestampType,
5757
TimestamptzType,
5858
TimeType,
59+
UnknownType,
5960
UUIDType,
6061
)
6162
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros
@@ -222,11 +223,14 @@ def partition_type(self, schema: Schema) -> StructType:
222223
:return: A StructType that represents the PartitionSpec, with a NestedField for each PartitionField.
223224
"""
224225
nested_fields = []
226+
schema_ids = schema._lazy_id_to_field
225227
for field in self.fields:
226-
source_type = schema.find_type(field.source_id)
227-
result_type = field.transform.result_type(source_type)
228-
required = schema.find_field(field.source_id).required
229-
nested_fields.append(NestedField(field.field_id, field.name, result_type, required=required))
228+
if source_field := schema_ids.get(field.source_id):
229+
result_type = field.transform.result_type(source_field.field_type)
230+
nested_fields.append(NestedField(field.field_id, field.name, result_type, required=source_field.required))
231+
else:
232+
# Since the source field has been drop we cannot determine the type
233+
nested_fields.append(NestedField(field.field_id, field.name, UnknownType()))
230234
return StructType(*nested_fields)
231235

232236
def partition_to_path(self, data: Record, schema: Schema) -> str:

tests/integration/test_reads.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,3 +1083,19 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> None:
10831083

10841084
scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'")
10851085
assert len(scan.to_arrow()) > 0
1086+
1087+
1088+
@pytest.mark.integration
1089+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
1090+
def test_scan_source_field_missing_in_spec(catalog: Catalog, spark: SparkSession) -> None:
1091+
identifier = "default.test_dropped_field"
1092+
spark.sql(f"DROP TABLE IF EXISTS {identifier}")
1093+
spark.sql(f"CREATE TABLE {identifier} (foo int, bar int, jaz string) USING ICEBERG PARTITIONED BY (foo, bar)")
1094+
spark.sql(
1095+
f"INSERT INTO {identifier} (foo, bar, jaz) VALUES (1, 1, 'dummy data'), (1, 2, 'dummy data again'), (2, 1, 'another partition')"
1096+
)
1097+
spark.sql(f"ALTER TABLE {identifier} DROP PARTITION FIELD foo")
1098+
spark.sql(f"ALTER TABLE {identifier} DROP COLUMN foo")
1099+
1100+
table = catalog.load_table(identifier)
1101+
assert len(list(table.scan().plan_files())) == 3

tests/table/test_partitioning.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
TimestampType,
4848
TimestamptzType,
4949
TimeType,
50+
UnknownType,
5051
UUIDType,
5152
)
5253

@@ -165,6 +166,28 @@ def test_partition_spec_to_path() -> None:
165166
assert spec.partition_to_path(record, schema) == "my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10"
166167

167168

169+
def test_partition_spec_to_path_dropped_source_id() -> None:
170+
schema = Schema(
171+
NestedField(field_id=1, name="str", field_type=StringType(), required=False),
172+
NestedField(field_id=2, name="other_str", field_type=StringType(), required=False),
173+
NestedField(field_id=3, name="int", field_type=IntegerType(), required=True),
174+
)
175+
176+
spec = PartitionSpec(
177+
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="my#str%bucket"),
178+
PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="other str+bucket"),
179+
# Point partition field to missing source id
180+
PartitionField(source_id=4, field_id=1002, transform=BucketTransform(num_buckets=25), name="my!int:bucket"),
181+
spec_id=3,
182+
)
183+
184+
record = Record("my+str", "( )", 10)
185+
186+
# Both partition field names and values should be URL encoded, with spaces mapping to plus signs, to match the Java
187+
# behaviour: https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204
188+
assert spec.partition_to_path(record, schema) == "my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10"
189+
190+
168191
def test_partition_type(table_schema_simple: Schema) -> None:
169192
spec = PartitionSpec(
170193
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"),
@@ -178,6 +201,19 @@ def test_partition_type(table_schema_simple: Schema) -> None:
178201
)
179202

180203

204+
def test_partition_type_missing_source_field(table_schema_simple: Schema) -> None:
205+
spec = PartitionSpec(
206+
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"),
207+
PartitionField(source_id=10, field_id=1001, transform=BucketTransform(num_buckets=25), name="int_bucket"),
208+
spec_id=3,
209+
)
210+
211+
assert spec.partition_type(table_schema_simple) == StructType(
212+
NestedField(field_id=1000, name="str_truncate", field_type=StringType(), required=False),
213+
NestedField(field_id=1001, name="int_bucket", field_type=UnknownType(), required=False),
214+
)
215+
216+
181217
@pytest.mark.parametrize(
182218
"source_type, value",
183219
[

0 commit comments

Comments
 (0)