ConeyLiu commented on code in PR #1427: URL: https://github.com/apache/iceberg-python/pull/1427#discussion_r1886444111
########## pyiceberg/table/__init__.py: ########## @@ -1423,6 +1451,66 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] + def _target_split_size(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, TableProperties.READ_SPLIT_SIZE, TableProperties.READ_SPLIT_SIZE_DEFAULT + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_SIZE, table_value) # type: ignore + + def _loop_back(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, TableProperties.READ_SPLIT_LOOKBACK, TableProperties.READ_SPLIT_LOOKBACK_DEFAULT + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_LOOKBACK, table_value) # type: ignore + + def _split_open_file_cost(self) -> int: + table_value = property_as_int( + self.table_metadata.properties, + TableProperties.READ_SPLIT_OPEN_FILE_COST, + TableProperties.READ_SPLIT_OPEN_FILE_COST_DEFAULT, + ) + return property_as_int(self.options, TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore + + def plan_task(self) -> Iterable[CombinedFileScanTask]: + """Plan balanced combined tasks for this scan by splitting large and combining small tasks. + + Returns: + List of CombinedFileScanTasks + """ + split_size = self._target_split_size() + loop_back = self._loop_back() + open_file_cost = self._split_open_file_cost() + + def split(task: FileScanTask) -> List[FileScanTask]: + data_file = task.file + if not data_file.split_offsets: + return [task] + + split_offsets = data_file.split_offsets + if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)): + # split offsets are strictly ascending + return [task] + + all_tasks = [] + for i in range(len(split_offsets) - 1): + all_tasks.append( + FileScanTask(data_file, task.delete_files, split_offsets[i], split_offsets[i + 1] - split_offsets[i]) + ) + + all_tasks.append( + FileScanTask(data_file, task.delete_files, split_offsets[-1], data_file.file_size_in_bytes - split_offsets[-1]) + ) + + return all_tasks + + def weight_func(task: FileScanTask) -> int: + return max(task.size_in_bytes(), (1 + len(task.delete_files)) * open_file_cost) Review Comment: yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org