sungwy commented on code in PR #2255:
URL: https://github.com/apache/iceberg-python/pull/2255#discussion_r2283804734


##########
tests/table/test_delete_file_index.py:
##########


Review Comment:
   This is a very comprehensive test suite! I think we could also benefit from 
introducing some integration tests, because if we get this wrong, we have the 
potential to introduce data integrity issues to our users.
   
   There are some great examples of integration tests in 
[tests/integration/test_writes/test_writes.py](https://github.com/apache/iceberg-python/blob/8b43eb88fcc80b4980ce16b71852d211d7e93b13/tests/integration/test_writes/test_writes.py#L273)
 that invokes a set of actions in either Spark or PyIceberg and then reads the 
result in both to assert that the result is the same through either.
   
   In our case, I think we could set up a spark session and PyIceberg to the 
same catalog and:
   - create a positional delete through PySpark
   - read the result in PyIceberg
   - read the result in Spark
   - assert that the two results are the same
   
   I think it would be good to cover a range of cases as we did in this unit 
test suite. WDYT?



##########
pyiceberg/table/delete_file_index.py:
##########
@@ -0,0 +1,528 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from bisect import bisect_left
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from pyiceberg.conversions import from_bytes
+from pyiceberg.expressions import EqualTo
+from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
+from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA, DataFile, 
DataFileContent, FileFormat, ManifestEntry
+from pyiceberg.partitioning import PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.typedef import Record
+from pyiceberg.types import NestedField
+from pyiceberg.utils.partition_map import PartitionMap
+
+PATH_FIELD_ID = 2147483546
+
+
+class EqualityDeleteFileWrapper:
+    """Stores the equality delete file along with the sequence number."""
+
+    def __init__(self, manifest_entry: ManifestEntry, schema: Schema) -> None:
+        """Initialize a new EqualityDeleteFileWrapper.
+
+        Args:
+            manifest_entry: The manifest entry containing the delete file
+            schema: The table schema for field lookups
+        """
+        self.delete_file = manifest_entry.data_file
+        self.schema = schema
+        self.apply_sequence_number = (manifest_entry.sequence_number or 0) - 1
+        self._converted_lower_bounds: Optional[Dict[int, Any]] = None
+        self._converted_upper_bounds: Optional[Dict[int, Any]] = None
+        self._equality_fields: Optional[List[NestedField]] = None
+
+    def equality_fields(self) -> List[NestedField]:
+        """Get equality fields for current delete file.
+
+        Returns:
+            List of NestedField objects representing the equality fields
+        """
+        if self._equality_fields is None:
+            fields = []
+            for field_id in self.delete_file.equality_ids or []:
+                field = self.schema.find_field(field_id)
+                if field:
+                    fields.append(field)
+            self._equality_fields = fields
+        return self._equality_fields
+
+    def lower_bound(self, field_id: int) -> Optional[Any]:
+        """Convert or get lower bound for a field.
+
+        Args:
+            field_id: The field ID to get the bound for
+
+        Returns:
+            The converted lower bound value or None if not available
+        """
+        if self._converted_lower_bounds is None:
+            self._converted_lower_bounds = 
self._convert_bounds(self.delete_file.lower_bounds)
+        return self._converted_lower_bounds.get(field_id)
+
+    def upper_bound(self, field_id: int) -> Optional[Any]:
+        """Convert or get upper bound for a field.
+
+        Args:
+            field_id: The field ID to get the bound for
+
+        Returns:
+            The converted upper bound value or None if not available
+        """
+        if self._converted_upper_bounds is None:
+            self._converted_upper_bounds = 
self._convert_bounds(self.delete_file.upper_bounds)
+        return self._converted_upper_bounds.get(field_id)
+
+    def _convert_bounds(self, bounds: Dict[int, bytes]) -> Dict[int, Any]:
+        """Convert byte bounds to their proper types.
+
+        Args:
+            bounds: Dictionary mapping field IDs to byte bounds
+
+        Returns:
+            Dictionary mapping field IDs to converted bound values
+        """
+        if not bounds:
+            return {}
+
+        converted = {}
+        for field in self.equality_fields():
+            field_id = field.field_id
+            bound = bounds.get(field_id)
+            if bound is not None:
+                # Use the field type to convert the bound
+                converted[field_id] = from_bytes(field.field_type, bound)
+        return converted
+
+
+class PositionalDeleteFileWrapper:
+    """Stores the position delete file along with the sequence number for 
filtering."""
+
+    def __init__(self, manifest_entry: ManifestEntry):
+        """Initialize a new PositionalDeleteFileWrapper.
+
+        Args:
+            manifest_entry: The manifest entry containing the delete file
+        """
+        self.delete_file = manifest_entry.data_file
+        self.apply_sequence_number = manifest_entry.sequence_number or 0
+
+
+class DeletesGroup:
+    """Base class for managing collections of delete files with lazy sorting 
and binary search.
+
+    Provides O(1) insertion with deferred O(n log n) sorting and O(log n + k) 
filtering
+    where k is the number of matching delete files.
+    """
+
+    def __init__(self) -> None:
+        """Initialize a new DeletesGroup."""
+        self._buffer: Optional[List[Any]] = []
+        self._sorted: bool = False  # Lazy sorting flag
+        self._seqs: Optional[List[int]] = None
+        self._files: Optional[List[Any]] = None
+
+    def add(self, wrapper: Any) -> None:
+        """Add a delete file wrapper to the group.
+
+        Args:
+            wrapper: The delete file wrapper to add
+
+        Raises:
+            ValueError: If attempting to add files after indexing
+        """
+        if self._buffer is None:
+            raise ValueError("Can't add files to group after indexing")
+        self._buffer.append(wrapper)
+        self._sorted = False
+
+    def _index_if_needed(self) -> None:
+        """Sort wrappers by apply_sequence_number if not already sorted.
+
+        This method implements lazy sorting to avoid unnecessary work when
+        files are only added but not queried.
+        """
+        if not self._sorted:
+            self._files = sorted(self._buffer, key=lambda f: 
f.apply_sequence_number)  # type: ignore
+            self._seqs = [f.apply_sequence_number for f in self._files]
+            self._buffer = None
+            self._sorted = True
+
+    def _get_candidates(self, seq: int) -> List[Any]:
+        """Get delete files with apply_sequence_number >= seq using binary 
search.
+
+        Args:
+            seq: The sequence number to filter by
+
+        Returns:
+            List of delete file wrappers with sequence number >= seq
+        """
+        self._index_if_needed()
+
+        if not self._files or not self._seqs:
+            return []
+
+        start_idx = bisect_left(self._seqs, seq)
+
+        if start_idx >= len(self._files):
+            return []
+
+        return self._files[start_idx:]
+
+
+class EqualityDeletesGroup(DeletesGroup):
+    """Extends the base DeletesGroup with equality-specific filtering logic.
+
+    Uses file statistics and bounds to eliminate impossible matches before 
expensive operations.
+    This optimization significantly reduces the number of delete files that 
need to be processed
+    during scan planning.
+    """
+
+    def filter(self, seq: int, data_file: DataFile) -> List[DataFile]:
+        """Find equality deletes that could apply to the data file.
+
+        Args:
+            seq: The sequence number to filter by
+            data_file: The data file to check against
+
+        Returns:
+            List of delete files that may apply to the data file
+        """
+        candidates = self._get_candidates(seq)
+
+        matching_files = []
+        for wrapper in candidates:
+            if self._can_contain_eq_deletes_for_file(data_file, wrapper):
+                matching_files.append(wrapper.delete_file)
+
+        return matching_files
+
+    def _can_contain_eq_deletes_for_file(self, data_file: DataFile, 
delete_wrapper: EqualityDeleteFileWrapper) -> bool:
+        """Check if a data file might contain rows deleted by an equality 
delete file.
+
+        This method uses statistics (bounds and null counts) to determine if a 
delete file
+        could possibly match any rows in a data file, avoiding unnecessary 
processing.
+
+        Args:
+            data_file: The data file to check
+            delete_wrapper: The equality delete file wrapper
+
+        Returns:
+            True if the delete file might apply to the data file, False 
otherwise
+        """
+        data_lowers = data_file.lower_bounds
+        data_uppers = data_file.upper_bounds
+        delete_file = delete_wrapper.delete_file
+
+        # Check bounds and null counts if available
+        data_null_counts = data_file.null_value_counts or {}
+        data_value_counts = data_file.value_counts or {}
+        delete_null_counts = delete_file.null_value_counts or {}
+        delete_value_counts = delete_file.value_counts or {}
+
+        # Check each equality field
+        for field in delete_wrapper.equality_fields():
+            if not field.field_type.is_primitive:
+                continue
+            field_id = field.field_id
+
+            # Check null values
+            if not field.required:
+                data_has_nulls = data_null_counts.get(field_id, 0) > 0
+                delete_has_nulls = delete_null_counts.get(field_id, 0) > 0
+                if data_has_nulls and delete_has_nulls:
+                    continue
+
+                # If data is all nulls but delete doesn't delete nulls, no 
match
+                data_all_nulls = data_null_counts.get(field_id, 0) == 
data_value_counts.get(field_id, 0)
+                if data_all_nulls and not delete_has_nulls:
+                    return False
+
+                # If delete is all nulls but data has no nulls, no match
+                delete_all_nulls = delete_null_counts.get(field_id, 0) == 
delete_value_counts.get(field_id, 0)
+                if delete_all_nulls and not data_has_nulls:
+                    return False
+
+            # Check bounds overlap if available
+            if (
+                data_lowers is not None
+                and data_uppers is not None
+                and delete_file.lower_bounds is not None
+                and delete_file.upper_bounds is not None
+            ):
+                data_lower_bytes = data_lowers.get(field_id)
+                data_upper_bytes = data_uppers.get(field_id)
+                delete_lower = delete_wrapper.lower_bound(field_id)
+                delete_upper = delete_wrapper.upper_bound(field_id)
+
+                # If any bound is missing, assume they overlap
+                if data_lower_bytes is None or data_upper_bytes is None or 
delete_lower is None or delete_upper is None:
+                    continue
+
+                # converting data file bounds
+                data_lower = from_bytes(field.field_type, data_lower_bytes)
+                data_upper = from_bytes(field.field_type, data_upper_bytes)
+
+                # Check if bounds don't overlap
+                if data_lower > delete_upper or data_upper < delete_lower:
+                    return False
+
+        return True
+
+
+class PositionalDeletesGroup(DeletesGroup):
+    """Extends the base DeletesGroup with positional-specific filtering.
+
+    Uses file path evaluation to determine which deletes apply to which data 
files.
+    This class handles both path-specific position deletes and partition-level 
position deletes.
+    """
+
+    def _is_file_targeted_by_delete(self, delete_file: DataFile, data_file: 
DataFile) -> bool:
+        """Check if a position delete file targets a specific data file.
+
+        Args:
+            delete_file: The position delete file to check
+            data_file: The data file to check against
+
+        Returns:
+            True if the delete file targets the data file, False otherwise
+        """
+        has_path_bounds = (
+            delete_file.lower_bounds
+            and delete_file.upper_bounds
+            and PATH_FIELD_ID in delete_file.lower_bounds
+            and PATH_FIELD_ID in delete_file.upper_bounds
+        )
+
+        if not has_path_bounds:
+            # applies to all files in the partition
+            return True
+
+        try:
+            lower_path = 
delete_file.lower_bounds[PATH_FIELD_ID].decode("utf-8")
+            upper_path = 
delete_file.upper_bounds[PATH_FIELD_ID].decode("utf-8")
+
+            if lower_path == upper_path and lower_path == data_file.file_path:
+                return True
+        except (UnicodeDecodeError, AttributeError):
+            # If we can't decode the path bounds, fall back to the metrics 
evaluator
+            pass
+
+        # Use metrics evaluator for more complex path matching
+        evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, 
EqualTo("file_path", data_file.file_path))
+        return evaluator.eval(delete_file)
+
+    def filter(self, seq: int, data_file: DataFile) -> List[DataFile]:
+        """Filter positional delete files that apply to the given sequence 
number and data file.
+
+        Args:
+            seq: The sequence number to filter by
+            data_file: The data file to check against
+
+        Returns:
+            List of delete files that apply to the data file
+        """
+        candidates = self._get_candidates(seq)
+
+        matching_files = []
+        for wrapper in candidates:
+            delete_file = wrapper.delete_file
+            if self._is_file_targeted_by_delete(delete_file, data_file):
+                matching_files.append(delete_file)
+
+        return matching_files
+
+
+class DeleteFileIndex:
+    """Index that organizes delete files by partition for efficient lookup 
during scan planning.
+
+    This class indexes delete files by type (equality or positional), 
partition, and path
+    to enable efficient lookup of delete files that apply to a given data file.
+    """
+
+    def __init__(self, table_schema: Schema, partition_specs: 
Optional[Dict[int, PartitionSpec]] = None) -> None:
+        """Initialize a DeleteFileIndex.
+
+        Args:
+            table_schema: The table schema for field lookups
+            partition_specs: Dictionary mapping spec IDs to PartitionSpec 
objects
+        """
+        self.table_schema = table_schema
+        self.partition_specs = partition_specs or {}
+
+        # Global deletes
+        self.global_eq_deletes = EqualityDeletesGroup()
+
+        # Partition-specific deletes
+        self.eq_deletes_by_partition: PartitionMap[EqualityDeletesGroup] = 
PartitionMap(self.partition_specs)
+        self.pos_deletes_by_partition: PartitionMap[PositionalDeletesGroup] = 
PartitionMap(self.partition_specs)
+
+        # Path-specific deletes
+        self.pos_deletes_by_path: Dict[str, PositionalDeletesGroup] = {}
+        self.dv: Dict[str, Tuple[DataFile, int]] = {}
+        self.dv_values: Optional[List[Tuple[DataFile, int]]] = None
+        self.dv_sorted: bool = False
+
+    def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: 
Optional[Record] = None) -> None:
+        """Add delete file to the appropriate partition group based on its 
type.
+
+        Args:
+            manifest_entry: The manifest entry containing the delete file
+            partition_key: The partition key for the delete file, if applicable
+
+        Raises:
+            ValueError: If attempting to add multiple deletion vectors for the 
same data file
+        """
+        delete_file = manifest_entry.data_file
+
+        if delete_file.content == DataFileContent.EQUALITY_DELETES:
+            # Skip equality deletes without equality_ids
+            if not delete_file.equality_ids or len(delete_file.equality_ids) 
== 0:
+                return
+
+            wrapper = EqualityDeleteFileWrapper(manifest_entry, 
self.table_schema)
+
+            # Check if the partition spec is unpartitioned
+            is_unpartitioned = False
+            spec_id = delete_file.spec_id or 0
+
+            if spec_id in self.partition_specs:
+                spec = self.partition_specs[spec_id]
+                # A spec is unpartitioned when it has no partition fields
+                is_unpartitioned = spec.is_unpartitioned()
+
+            if is_unpartitioned:
+                # If the spec is unpartitioned, add to global deletes
+                self._add_to_partition_group(wrapper, None)
+            else:
+                # Otherwise, add to partition-specific deletes
+                self._add_to_partition_group(wrapper, partition_key)
+
+        elif delete_file.content == DataFileContent.POSITION_DELETES:
+            # Check if this is a deletion vector (Puffin format)
+            if delete_file.file_format == FileFormat.PUFFIN:
+                sequence_number = manifest_entry.sequence_number or 0
+                path = delete_file.file_path
+                self.dv[path] = (delete_file, sequence_number)

Review Comment:
   I think we should port over this uniqueness check on the Deletion Vectors: 
https://github.com/apache/iceberg/blob/0b599bc37e39b3893e443393136c7fddc8391d6d/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L507-L510
   ```suggestion
                   if path not in self.dv:
                       self.dv[path] = (delete_file, sequence_number)
                   else:
                       raise ValueError(f"Can't index multiple DVs for {path}: 
{self.dv[path]} and {(delete_file, sequence_number)}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to