syun64 commented on code in PR #363:
URL: https://github.com/apache/iceberg-python/pull/363#discussion_r1626456000


##########
pyiceberg/table/__init__.py:
##########
@@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema: 
Schema, arrow_table: pa.T
     table_partitions: List[TablePartition] = 
_get_table_partitions(arrow_table, spec, schema, slice_instructions)
 
     return table_partitions
+
+
+class _ManifestMergeManager:
+    _target_size_bytes: int
+    _min_count_to_merge: int
+    _merge_enabled: bool
+    _snapshot_producer: _MergingSnapshotProducer
+
+    def __init__(
+        self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: 
bool, snapshot_producer: _MergingSnapshotProducer
+    ) -> None:
+        self._target_size_bytes = target_size_bytes
+        self._min_count_to_merge = min_count_to_merge
+        self._merge_enabled = merge_enabled
+        self._snapshot_producer = snapshot_producer
+
+    def _group_by_spec(
+        self, first_manifest: ManifestFile, remaining_manifests: 
List[ManifestFile]
+    ) -> Dict[int, List[ManifestFile]]:
+        groups = defaultdict(list)
+        groups[first_manifest.partition_spec_id].append(first_manifest)
+        for manifest in remaining_manifests:
+            groups[manifest.partition_spec_id].append(manifest)
+        return groups
+
+    def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) 
-> ManifestFile:
+        with 
self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id))
 as writer:
+            for manifest in manifest_bin:
+                for entry in 
self._snapshot_producer.fetch_manifest_entry(manifest=manifest, 
discard_deleted=False):
+                    if entry.status == ManifestEntryStatus.DELETED:
+                        #  suppress deletes from previous snapshots. only 
files deleted by this snapshot
+                        #                should be added to the new manifest
+                        if entry.snapshot_id == 
self._snapshot_producer.snapshot_id:
+                            writer.delete(entry)
+                    elif entry.status == ManifestEntryStatus.ADDED and 
entry.snapshot_id == self._snapshot_producer.snapshot_id:
+                        # adds from this snapshot are still adds, otherwise 
they should be existing
+                        writer.add(entry)
+                    else:
+                        # add all files from the old manifest as existing files
+                        writer.existing(entry)
+
+        return writer.to_manifest_file()
+
+    def _merge_group(self, first_manifest: ManifestFile, spec_id: int, 
manifests: List[ManifestFile]) -> List[ManifestFile]:
+        packer: ListPacker[ManifestFile] = 
ListPacker(target_weight=self._target_size_bytes, lookback=1, 
largest_bin_first=False)
+        bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: 
m.manifest_length)
+
+        def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
+            output_manifests = []
+            if len(manifest_bin) == 1:
+                output_manifests.append(manifest_bin[0])
+            elif first_manifest in manifest_bin and len(manifest_bin) < 
self._min_count_to_merge:
+                #  if the bin has the first manifest (the new data files or an 
appended manifest file)
+                #  then only merge it

Review Comment:
   nit: clean up of the comment - lining may be good here



##########
pyiceberg/table/__init__.py:
##########
@@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema: 
Schema, arrow_table: pa.T
     table_partitions: List[TablePartition] = 
_get_table_partitions(arrow_table, spec, schema, slice_instructions)
 
     return table_partitions
+
+
+class _ManifestMergeManager:
+    _target_size_bytes: int
+    _min_count_to_merge: int
+    _merge_enabled: bool
+    _snapshot_producer: _MergingSnapshotProducer
+
+    def __init__(
+        self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: 
bool, snapshot_producer: _MergingSnapshotProducer
+    ) -> None:
+        self._target_size_bytes = target_size_bytes
+        self._min_count_to_merge = min_count_to_merge
+        self._merge_enabled = merge_enabled
+        self._snapshot_producer = snapshot_producer
+
+    def _group_by_spec(
+        self, first_manifest: ManifestFile, remaining_manifests: 
List[ManifestFile]
+    ) -> Dict[int, List[ManifestFile]]:
+        groups = defaultdict(list)
+        groups[first_manifest.partition_spec_id].append(first_manifest)
+        for manifest in remaining_manifests:
+            groups[manifest.partition_spec_id].append(manifest)
+        return groups
+
+    def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) 
-> ManifestFile:
+        with 
self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id))
 as writer:
+            for manifest in manifest_bin:
+                for entry in 
self._snapshot_producer.fetch_manifest_entry(manifest=manifest, 
discard_deleted=False):
+                    if entry.status == ManifestEntryStatus.DELETED:
+                        #  suppress deletes from previous snapshots. only 
files deleted by this snapshot
+                        #                should be added to the new manifest
+                        if entry.snapshot_id == 
self._snapshot_producer.snapshot_id:

Review Comment:
   nit: I think we could condense this into the outer condition, consistent to 
how you did in the line below



##########
pyiceberg/table/__init__.py:
##########
@@ -3735,3 +3894,92 @@ def _determine_partitions(spec: PartitionSpec, schema: 
Schema, arrow_table: pa.T
     table_partitions: List[TablePartition] = 
_get_table_partitions(arrow_table, spec, schema, slice_instructions)
 
     return table_partitions
+
+
+class _ManifestMergeManager:
+    _target_size_bytes: int
+    _min_count_to_merge: int
+    _merge_enabled: bool
+    _snapshot_producer: _MergingSnapshotProducer
+
+    def __init__(
+        self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: 
bool, snapshot_producer: _MergingSnapshotProducer
+    ) -> None:
+        self._target_size_bytes = target_size_bytes
+        self._min_count_to_merge = min_count_to_merge
+        self._merge_enabled = merge_enabled
+        self._snapshot_producer = snapshot_producer
+
+    def _group_by_spec(
+        self, first_manifest: ManifestFile, remaining_manifests: 
List[ManifestFile]

Review Comment:
   nit: would it be simpler to just have a single ordered list of manifests as 
the argument for this function since we don't have a special handling for the 
`first_manifest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to