gabeiglio commented on code in PR #2176: URL: https://github.com/apache/iceberg-python/pull/2176#discussion_r2214815588
########## pyiceberg/table/update/validate.py: ########## @@ -14,19 +14,267 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from bisect import bisect_left from typing import Iterator, Optional, Set from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import ( + INITIAL_SEQUENCE_NUMBER, + DataFile, + DataFileContent, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, +) +from pyiceberg.partitioning import PartitionMap, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between from pyiceberg.typedef import Record VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE} +VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, Operation.OVERWRITE} + + +class _PositionDeletes: + # Indexed state + _seqs: list[int] = [] + _entries: list[ManifestEntry] = [] + + # Buffer used to hold files before indexing + _buffer: list[ManifestEntry] = [] + _indexed: bool = False + + def _index_if_needed(self) -> None: + if self._indexed is False: + self._entries = sorted(self._buffer, key=lambda entry: _get_sequence_number_or_raise(entry)) + self._seqs = [_get_sequence_number_or_raise(entry) for entry in self._entries] + self._indexed = True + + def add_entry(self, entry: ManifestEntry) -> None: + if self._indexed: + raise Exception("Can't add files upon indexing.") + self._buffer.append(entry) + + def filter(self, seq: int) -> list[ManifestEntry]: + self._index_if_needed() + start = _find_start_index(self._seqs, seq) + + if start >= len(self._entries): + return [] + + if start == 0: + return self.referenced_delete_files() + + matching_entries_count: int = len(self._entries) - start + return self._entries[matching_entries_count:] + + def referenced_delete_files(self) -> list[ManifestEntry]: + self._index_if_needed() + return self._entries + + def is_empty(self) -> bool: + self._index_if_needed() + return len(self._entries) > 0 + + +class _EqualityDeletes: + # Indexed state + _seqs: list[int] = [] + _entries: list[ManifestEntry] = [] + + # Buffer used to hold files before indexing + _buffer: list[ManifestEntry] = [] + _indexed: bool = False + + def _index_if_needed(self) -> None: + if self._indexed is False: + self._entries = sorted(self._buffer, key=lambda entry: _get_sequence_number_or_raise(entry)) + self._seqs = [_get_sequence_number_or_raise(entry) for entry in self._entries] + self._indexed = True + + def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None: + # TODO: Equality deletes should consider the spec to get the equality fields + if self._indexed: + raise Exception("Can't add files upon indexing.") + self._buffer.append(entry) + + def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]: + self._index_if_needed() + start = _find_start_index(self._seqs, seq) + + if start >= len(self._entries): + return [] + + if start == 0: + return self.referenced_delete_files() + + matching_entries_count: int = len(self._entries) - start + return self._entries[matching_entries_count:] + + def referenced_delete_files(self) -> list[ManifestEntry]: Review Comment: Yes, this makes sense. When implementing at first I started with DeleteFiles but move to manifests for the sequence number. Will change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org