syun64 commented on code in PR #363: URL: https://github.com/apache/iceberg-python/pull/363#discussion_r1626456000
########## pyiceberg/table/__init__.py: ########## @@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions + + +class _ManifestMergeManager: + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + _snapshot_producer: _MergingSnapshotProducer + + def __init__( + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer + ) -> None: + self._target_size_bytes = target_size_bytes + self._min_count_to_merge = min_count_to_merge + self._merge_enabled = merge_enabled + self._snapshot_producer = snapshot_producer + + def _group_by_spec( + self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile] + ) -> Dict[int, List[ManifestFile]]: + groups = defaultdict(list) + groups[first_manifest.partition_spec_id].append(first_manifest) + for manifest in remaining_manifests: + groups[manifest.partition_spec_id].append(manifest) + return groups + + def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile: + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: + for manifest in manifest_bin: + for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): + if entry.status == ManifestEntryStatus.DELETED: + # suppress deletes from previous snapshots. only files deleted by this snapshot + # should be added to the new manifest + if entry.snapshot_id == self._snapshot_producer.snapshot_id: + writer.delete(entry) + elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id: + # adds from this snapshot are still adds, otherwise they should be existing + writer.add(entry) + else: + # add all files from the old manifest as existing files + writer.existing(entry) + + return writer.to_manifest_file() + + def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]: + packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) + bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) + + def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: + output_manifests = [] + if len(manifest_bin) == 1: + output_manifests.append(manifest_bin[0]) + elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: + # if the bin has the first manifest (the new data files or an appended manifest file) + # then only merge it Review Comment: nit: clean up of the comment - lining may be good here ########## pyiceberg/table/__init__.py: ########## @@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions + + +class _ManifestMergeManager: + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + _snapshot_producer: _MergingSnapshotProducer + + def __init__( + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer + ) -> None: + self._target_size_bytes = target_size_bytes + self._min_count_to_merge = min_count_to_merge + self._merge_enabled = merge_enabled + self._snapshot_producer = snapshot_producer + + def _group_by_spec( + self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile] + ) -> Dict[int, List[ManifestFile]]: + groups = defaultdict(list) + groups[first_manifest.partition_spec_id].append(first_manifest) + for manifest in remaining_manifests: + groups[manifest.partition_spec_id].append(manifest) + return groups + + def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile: + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: + for manifest in manifest_bin: + for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): + if entry.status == ManifestEntryStatus.DELETED: + # suppress deletes from previous snapshots. only files deleted by this snapshot + # should be added to the new manifest + if entry.snapshot_id == self._snapshot_producer.snapshot_id: Review Comment: nit: I think we could condense this into the outer condition, consistent to how you did in the line below ########## pyiceberg/table/__init__.py: ########## @@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions + + +class _ManifestMergeManager: + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + _snapshot_producer: _MergingSnapshotProducer + + def __init__( + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer + ) -> None: + self._target_size_bytes = target_size_bytes + self._min_count_to_merge = min_count_to_merge + self._merge_enabled = merge_enabled + self._snapshot_producer = snapshot_producer + + def _group_by_spec( + self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile] Review Comment: nit: would it be simpler to just have a single ordered list of manifests as the argument for this function since we don't have a special handling for the `first_manifest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org