kevinjqliu commented on code in PR #1427: URL: https://github.com/apache/iceberg-python/pull/1427#discussion_r1884198201
########## pyiceberg/table/__init__.py: ########## @@ -191,6 +193,15 @@ class TableProperties: DELETE_MODE_MERGE_ON_READ = "merge-on-read" DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE + READ_SPLIT_SIZE = "read.split.target-size" Review Comment: lets add these options to the docs similar to https://py.iceberg.apache.org/configuration/#write-options ########## pyiceberg/table/__init__.py: ########## @@ -1423,6 +1451,66 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] + def _target_split_size(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, TableProperties.READ_SPLIT_SIZE, TableProperties.READ_SPLIT_SIZE_DEFAULT + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_SIZE, table_value) # type: ignore + + def _loop_back(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, TableProperties.READ_SPLIT_LOOKBACK, TableProperties.READ_SPLIT_LOOKBACK_DEFAULT + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_LOOKBACK, table_value) # type: ignore + + def _split_open_file_cost(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, + TableProperties.READ_SPLIT_OPEN_FILE_COST, + TableProperties.READ_SPLIT_OPEN_FILE_COST_DEFAULT, + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore + + def plan_task(self) -> Iterable[CombinedFileScanTask]: + """Plan balanced combined tasks for this scan by splitting large and combining small tasks. + + Returns: + List of CombinedFileScanTasks + """ + split_size = self._target_split_size() + loop_back = self._loop_back() + open_file_cost = self._split_open_file_cost() + + def split(task: FileScanTask) -> List[FileScanTask]: + data_file = task.file + if not data_file.split_offsets: + return [task] + + split_offsets = data_file.split_offsets + if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)): + # split offsets are strictly ascending + return [task] + + all_tasks = [] + for i in range(len(split_offsets) - 1): + all_tasks.append( + FileScanTask(data_file, task.delete_files, split_offsets[i], split_offsets[i + 1] - split_offsets[i]) + ) + + all_tasks.append( + FileScanTask(data_file, task.delete_files, split_offsets[-1], data_file.file_size_in_bytes - split_offsets[-1]) + ) + + return all_tasks + + def weight_func(task: FileScanTask) -> int: + return max(task.size_in_bytes(), (1 + len(task.delete_files)) * open_file_cost) Review Comment: is the 1 here representing the data file? ########## pyiceberg/table/__init__.py: ########## @@ -1423,6 +1451,66 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] + def _target_split_size(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, TableProperties.READ_SPLIT_SIZE, TableProperties.READ_SPLIT_SIZE_DEFAULT + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_SIZE, table_value) # type: ignore + + def _loop_back(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, TableProperties.READ_SPLIT_LOOKBACK, TableProperties.READ_SPLIT_LOOKBACK_DEFAULT + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_LOOKBACK, table_value) # type: ignore + + def _split_open_file_cost(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, + TableProperties.READ_SPLIT_OPEN_FILE_COST, + TableProperties.READ_SPLIT_OPEN_FILE_COST_DEFAULT, + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore + + def plan_task(self) -> Iterable[CombinedFileScanTask]: + """Plan balanced combined tasks for this scan by splitting large and combining small tasks. + + Returns: + List of CombinedFileScanTasks + """ + split_size = self._target_split_size() + loop_back = self._loop_back() + open_file_cost = self._split_open_file_cost() + + def split(task: FileScanTask) -> List[FileScanTask]: + data_file = task.file + if not data_file.split_offsets: + return [task] + + split_offsets = data_file.split_offsets + if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)): + # split offsets are strictly ascending Review Comment: nit: ```suggestion # split offsets must be strictly ascending ``` according to the spec https://iceberg.apache.org/spec/#manifests ########## tests/integration/test_reads.py: ########## @@ -873,3 +873,48 @@ def test_table_scan_empty_table(catalog: Catalog) -> None: result_table = tbl.scan().to_arrow() assert len(result_table) == 0 + + +@pytest.mark.integration +def test_plan_tasks(session_catalog: Catalog) -> None: Review Comment: nit: should we also test `size_in_bytes` and the relationship between `READ_SPLIT_SIZE`, `READ_SPLIT_LOOKBACK`, and `READ_SPLIT_OPEN_FILE_COST`? ########## pyiceberg/table/__init__.py: ########## @@ -191,6 +193,15 @@ class TableProperties: DELETE_MODE_MERGE_ON_READ = "merge-on-read" DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE + READ_SPLIT_SIZE = "read.split.target-size" Review Comment: looks like these are modelled after spark configs https://github.com/apache/iceberg/blob/main/docs/docs/spark-configuration.md#read-options ########## pyiceberg/manifest.py: ########## @@ -105,6 +105,9 @@ def _missing_(cls, value: object) -> Union[None, str]: return member return None + def is_splittable(self) -> bool: + return self.name == "AVRO" or self.name == "PARQUET" or self.name == "ORC" Review Comment: also this is not used ########## pyiceberg/manifest.py: ########## @@ -105,6 +105,9 @@ def _missing_(cls, value: object) -> Union[None, str]: return member return None + def is_splittable(self) -> bool: + return self.name == "AVRO" or self.name == "PARQUET" or self.name == "ORC" Review Comment: nit, use `FileFormat` enum ########## tests/integration/test_reads.py: ########## @@ -873,3 +873,48 @@ def test_table_scan_empty_table(catalog: Catalog) -> None: result_table = tbl.scan().to_arrow() assert len(result_table) == 0 + + +@pytest.mark.integration +def test_plan_tasks(session_catalog: Catalog) -> None: + from pyiceberg.table import TableProperties + + table_name = "default.test_plan_tasks" + try: + session_catalog.drop_table(table_name) + except NoSuchTableError: + pass # Just to make sure that the table doesn't exist + + tbl = session_catalog.create_table( + table_name, + Schema( + NestedField(1, "number", LongType()), + ), + properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"}, + ) + + # Write 10 row groups, that should end up as 10 batches + entries = 10 + tbl.append( + pa.Table.from_pylist( + [ + { + "number": number, + } + for number in range(entries) + ], + ) + ) + Review Comment: nit: add an assert here that there are 10 files. perhaps using the `files` metadata table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org