ConeyLiu commented on code in PR #1427:
URL: https://github.com/apache/iceberg-python/pull/1427#discussion_r1886444111
##########
pyiceberg/table/__init__.py:
##########
@@ -1423,6 +1451,66 @@ def plan_files(self) -> Iterable[FileScanTask]:
for data_entry in data_entries
]
+ def _target_split_size(self) -> int:
+ table_value = property_as_int(
+ self.table_metadata.properties, TableProperties.READ_SPLIT_SIZE,
TableProperties.READ_SPLIT_SIZE_DEFAULT
+ )
+ return property_as_int(self.options, TableProperties.READ_SPLIT_SIZE,
table_value) # type: ignore
+
+ def _loop_back(self) -> int:
+ table_value = property_as_int(
+ self.table_metadata.properties,
TableProperties.READ_SPLIT_LOOKBACK, TableProperties.READ_SPLIT_LOOKBACK_DEFAULT
+ )
+ return property_as_int(self.options,
TableProperties.READ_SPLIT_LOOKBACK, table_value) # type: ignore
+
+ def _split_open_file_cost(self) -> int:
+ table_value = property_as_int(
+ self.table_metadata.properties,
+ TableProperties.READ_SPLIT_OPEN_FILE_COST,
+ TableProperties.READ_SPLIT_OPEN_FILE_COST_DEFAULT,
+ )
+ return property_as_int(self.options,
TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore
+
+ def plan_task(self) -> Iterable[CombinedFileScanTask]:
+ """Plan balanced combined tasks for this scan by splitting large and
combining small tasks.
+
+ Returns:
+ List of CombinedFileScanTasks
+ """
+ split_size = self._target_split_size()
+ loop_back = self._loop_back()
+ open_file_cost = self._split_open_file_cost()
+
+ def split(task: FileScanTask) -> List[FileScanTask]:
+ data_file = task.file
+ if not data_file.split_offsets:
+ return [task]
+
+ split_offsets = data_file.split_offsets
+ if not all(split_offsets[i] <= split_offsets[i + 1] for i in
range(len(split_offsets) - 1)):
+ # split offsets are strictly ascending
+ return [task]
+
+ all_tasks = []
+ for i in range(len(split_offsets) - 1):
+ all_tasks.append(
+ FileScanTask(data_file, task.delete_files,
split_offsets[i], split_offsets[i + 1] - split_offsets[i])
+ )
+
+ all_tasks.append(
+ FileScanTask(data_file, task.delete_files, split_offsets[-1],
data_file.file_size_in_bytes - split_offsets[-1])
+ )
+
+ return all_tasks
+
+ def weight_func(task: FileScanTask) -> int:
+ return max(task.size_in_bytes(), (1 + len(task.delete_files)) *
open_file_cost)
Review Comment:
yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]