Skip to content

Commit 7f525e7

Browse files
committed
feat: Cache ResidualEvaluator
Fixes #2147 - Implement ResidualEvaluatorCache with LRU eviction and thread safety - Cache evaluators by partition spec, expression, case sensitivity, and schema - Fix mypy type annotations and add type ignore for cachetools decorator
1 parent bb41a6d commit 7f525e7

File tree

2 files changed

+121
-6
lines changed

2 files changed

+121
-6
lines changed

pyiceberg/expressions/visitors.py

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import hashlib
1718
import math
19+
import threading
1820
from abc import ABC, abstractmethod
1921
from functools import singledispatch
2022
from typing import (
@@ -23,6 +25,7 @@
2325
Dict,
2426
Generic,
2527
List,
28+
Optional,
2629
Set,
2730
SupportsFloat,
2831
Tuple,
@@ -1975,11 +1978,123 @@ def residual_for(self, partition_data: Record) -> BooleanExpression:
19751978
return self.expr
19761979

19771980

1981+
# =============================================================================
1982+
# ADD THESE BEFORE THE ResidualEvaluator CLASS DEFINITION
1983+
# =============================================================================
1984+
1985+
_DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE = 128
1986+
1987+
1988+
class ResidualEvaluatorCache:
1989+
"""Thread-safe LRU cache for ResidualEvaluator instances.
1990+
1991+
Caches ResidualEvaluators to avoid repeated instantiation and initialization
1992+
overhead when scanning multiple data files with identical partition specs,
1993+
expressions, schemas, and case sensitivity settings.
1994+
"""
1995+
1996+
_cache: Dict[str, ResidualEvaluator]
1997+
_maxsize: int
1998+
_lock: threading.RLock
1999+
2000+
def __init__(self, maxsize: int = _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE) -> None:
2001+
"""Initialize the cache.
2002+
2003+
Args:
2004+
maxsize: Maximum number of evaluators to cache. Defaults to 128.
2005+
"""
2006+
self._cache = {}
2007+
self._maxsize = maxsize
2008+
self._lock = threading.RLock()
2009+
2010+
@staticmethod
2011+
def _make_key(
2012+
spec_id: int,
2013+
expr: BooleanExpression,
2014+
case_sensitive: bool,
2015+
schema_id: Optional[int] = None,
2016+
) -> str:
2017+
"""Create deterministic cache key from evaluator parameters.
2018+
2019+
Args:
2020+
spec_id: Partition spec identifier.
2021+
expr: Filter expression tree.
2022+
case_sensitive: Case-sensitive flag.
2023+
schema_id: Optional schema identifier.
2024+
2025+
Returns:
2026+
32-character MD5 hex string cache key.
2027+
"""
2028+
key_parts = f"{spec_id}#{repr(expr)}#{case_sensitive}#{schema_id}"
2029+
return hashlib.md5(key_parts.encode()).hexdigest()
2030+
2031+
def get(
2032+
self,
2033+
spec: PartitionSpec,
2034+
expr: BooleanExpression,
2035+
case_sensitive: bool,
2036+
schema: Schema,
2037+
) -> Optional[ResidualEvaluator]:
2038+
"""Retrieve cached evaluator if it exists.
2039+
2040+
Args:
2041+
spec: Partition specification.
2042+
expr: Filter expression.
2043+
case_sensitive: Case sensitivity flag.
2044+
schema: Table schema.
2045+
2046+
Returns:
2047+
Cached ResidualEvaluator or None.
2048+
"""
2049+
cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id)
2050+
with self._lock:
2051+
return self._cache.get(cache_key)
2052+
2053+
def put(
2054+
self,
2055+
spec: PartitionSpec,
2056+
expr: BooleanExpression,
2057+
case_sensitive: bool,
2058+
schema: Schema,
2059+
evaluator: ResidualEvaluator,
2060+
) -> None:
2061+
"""Cache a ResidualEvaluator instance.
2062+
2063+
Args:
2064+
spec: Partition specification.
2065+
expr: Filter expression.
2066+
case_sensitive: Case sensitivity flag.
2067+
schema: Table schema.
2068+
evaluator: ResidualEvaluator to cache.
2069+
"""
2070+
cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id)
2071+
with self._lock:
2072+
if len(self._cache) >= self._maxsize:
2073+
oldest_key = next(iter(self._cache))
2074+
del self._cache[oldest_key]
2075+
self._cache[cache_key] = evaluator
2076+
2077+
def clear(self) -> None:
2078+
"""Clear all cached evaluators."""
2079+
with self._lock:
2080+
self._cache.clear()
2081+
2082+
2083+
_residual_evaluator_cache = ResidualEvaluatorCache()
2084+
2085+
19782086
def residual_evaluator_of(
19792087
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema
19802088
) -> ResidualEvaluator:
1981-
return (
1982-
UnpartitionedResidualEvaluator(schema=schema, expr=expr)
1983-
if spec.is_unpartitioned()
1984-
else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
1985-
)
2089+
cached = _residual_evaluator_cache.get(spec, expr, case_sensitive, schema)
2090+
if cached is not None:
2091+
return cached
2092+
2093+
evaluator: ResidualEvaluator
2094+
if spec.is_unpartitioned():
2095+
evaluator = UnpartitionedResidualEvaluator(schema=schema, expr=expr)
2096+
else:
2097+
evaluator = ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
2098+
2099+
_residual_evaluator_cache.put(spec, expr, case_sensitive, schema, evaluator)
2100+
return evaluator

tests/utils/test_manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
@pytest.fixture(autouse=True)
4949
def clear_global_manifests_cache() -> None:
5050
# Clear the global cache before each test
51-
_manifests.cache_clear()
51+
_manifests.cache_clear() # type: ignore
5252

5353

5454
def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, str]) -> None:

0 commit comments

Comments
 (0)