Fokko commented on code in PR #6233:
URL: https://github.com/apache/iceberg/pull/6233#discussion_r1028552098
##########
python/pyiceberg/table/__init__.py:
##########
@@ -199,16 +223,144 @@ def use_ref(self, name: str):
raise ValueError(f"Cannot scan unknown ref={name}")
- def select(self, *field_names: str) -> TableScan:
+ def select(self, *field_names: str) -> S:
if "*" in self.selected_fields:
return self.update(selected_fields=field_names)
return
self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
- def filter_rows(self, new_row_filter: BooleanExpression) -> TableScan:
+ def filter_rows(self, new_row_filter: BooleanExpression) -> S:
return self.update(row_filter=And(self.row_filter, new_row_filter))
- def filter_partitions(self, new_partition_filter: BooleanExpression) ->
TableScan:
+ def filter_partitions(self, new_partition_filter: BooleanExpression) -> S:
return self.update(partition_filter=And(self.partition_filter,
new_partition_filter))
- def with_case_sensitive(self, case_sensitive: bool = True) -> TableScan:
+ def with_case_sensitive(self, case_sensitive: bool = True) -> S:
return self.update(case_sensitive=case_sensitive)
+
+
+class ScanTask(ABC):
+ pass
+
+
+@dataclass(init=False)
+class FileScanTask(ScanTask):
+ file: DataFile
+ start: int
+ length: int
+
+ def __init__(self, data_file: DataFile, start: Optional[int] = None,
length: Optional[int] = None):
+ self.file = data_file
+ self.start = start or 0
+ self.length = length or data_file.file_size_in_bytes
+
+
+class _DictAsStruct(StructProtocol):
+ pos_to_name: Dict[int, str]
+ wrapped: Dict[str, Any]
+
+ def __init__(self, partition_type: StructType):
+ self.pos_to_name = {pos: field.name for pos, field in
enumerate(partition_type.fields)}
+
+ def wrap(self, to_wrap: Dict[str, Any]) -> _DictAsStruct:
+ self.wrapped = to_wrap
+ return self
+
+ def get(self, pos: int) -> Any:
+ return self.wrapped[self.pos_to_name[pos]]
+
+ def set(self, pos: int, value: Any) -> None:
+ raise NotImplementedError("Cannot set values in DictAsStruct")
+
+
+class DataScan(TableScan["DataScan"]):
+ def __init__(
+ self,
+ table: Table,
+ row_filter: Optional[BooleanExpression] = None,
+ partition_filter: Optional[BooleanExpression] = None,
+ selected_fields: Tuple[str] = ("*",),
+ case_sensitive: bool = True,
+ snapshot_id: Optional[int] = None,
+ options: Properties = EMPTY_DICT,
+ ):
+ super().__init__(table, row_filter, partition_filter, selected_fields,
case_sensitive, snapshot_id, options)
+
+ def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
+ spec = self.table.specs()[spec_id]
+ return visitors.manifest_evaluator(spec, self.table.schema(),
self.partition_filter, self.case_sensitive)
+
+ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile],
bool]:
+ spec = self.table.specs()[spec_id]
+ partition_type = spec.partition_type(self.table.schema())
+ partition_schema = Schema(*partition_type.fields)
+
+ # TODO: project the row filter # pylint: disable=W0511
+ partition_expr = And(self.partition_filter, AlwaysTrue())
+
+ # TODO: remove the dict to struct wrapper by using a StructProtocol
record # pylint: disable=W0511
+ wrapper = _DictAsStruct(partition_type)
+ evaluator = visitors.expression_evaluator(partition_schema,
partition_expr, self.case_sensitive)
+
+ return lambda data_file: evaluator(wrapper.wrap(data_file.partition))
+
+ def plan_files(self) -> Iterator[ScanTask]:
+ snapshot = self.snapshot()
+ if not snapshot:
+ return
+
+ io = self.table.io
+
+ # step 1: filter manifests using partition summaries
+ # the filter depends on the partition spec used to write the manifest
file, so create a cache of filters for each spec id
+
+ manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
+
+ manifests = [
+ manifest_file
+ for manifest_file in snapshot.manifests(io)
+ if
manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
+ ]
+
+ # step 2: filter the data files in each manifest
+ # this filter depends on the partition spec used to write the manifest
file
+
+ partition_evaluators: Dict[int, Callable[[DataFile], bool]] =
KeyDefaultDict(self._build_partition_evaluator)
Review Comment:
I like this, very elegant!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]