rdblue commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1456498171


##########
pyiceberg/table/__init__.py:
##########
@@ -1935,3 +2043,184 @@ def _generate_snapshot_id() -> int:
     snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
 
     return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+    write_uuid: uuid.UUID
+    task_id: int
+    df: pa.Table
+    sort_order_id: Optional[int] = None
+
+    # Later to be extended with partition information
+
+    def generate_data_file_filename(self, extension: str) -> str:
+        # Mimics the behavior in the Java API:
+        # 
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+    return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: 
int, commit_uuid: uuid.UUID) -> str:
+    # Mimics the behavior in Java:
+    # 
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+    return 
f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+    from pyiceberg.io.pyarrow import write_file
+
+    if len(table.spec().fields) > 0:
+        raise ValueError("Cannot write to partitioned tables")
+
+    if len(table.sort_order().fields) > 0:
+        raise ValueError("Cannot write to tables with a sort-order")
+
+    write_uuid = uuid.uuid4()
+    counter = itertools.count(0)
+
+    # This is an iter, so we don't have to materialize everything every time
+    # This will be more relevant when we start doing partitioned writes
+    yield from write_file(table, iter([WriteTask(write_uuid, next(counter), 
df)]))
+
+
+class _MergeAppend:
+    _operation: Operation
+    _table: Table
+    _snapshot_id: int
+    _parent_snapshot_id: Optional[int]
+    _added_data_files: List[DataFile]
+    _commit_uuid: uuid.UUID
+
+    def __init__(self, operation: Operation, table: Table, snapshot_id: int) 
-> None:
+        self._operation = operation
+        self._table = table
+        self._snapshot_id = snapshot_id
+        # Since we only support the main branch for now
+        self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := 
self._table.current_snapshot()) else None
+        self._added_data_files = []
+        self._commit_uuid = uuid.uuid4()
+
+    def append_data_file(self, data_file: DataFile) -> _MergeAppend:
+        self._added_data_files.append(data_file)
+        return self
+
+    def _deleted_entries(self) -> List[ManifestEntry]:
+        """To determine if we need to record any deleted entries.
+
+        With partial overwrites we have to use the predicate to evaluate
+        which entries are affected.
+        """
+        if self._operation == Operation.OVERWRITE:
+            if self._parent_snapshot_id is not None:
+                previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id)
+                if previous_snapshot is None:
+                    # This should never happen since you cannot overwrite an 
empty table
+                    raise ValueError(f"Could not find the previous snapshot: 
{self._parent_snapshot_id}")
+
+                executor = ExecutorFactory.get_or_create()
+
+                def _get_entries(manifest: ManifestFile) -> 
List[ManifestEntry]:
+                    return [
+                        ManifestEntry(
+                            status=ManifestEntryStatus.DELETED,
+                            snapshot_id=entry.snapshot_id,
+                            data_sequence_number=entry.data_sequence_number,
+                            file_sequence_number=entry.file_sequence_number,
+                            data_file=entry.data_file,
+                        )
+                        for entry in 
manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
+                    ]
+
+                list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._table.io))
+                return list(chain(*list_of_entries))
+            return []
+        elif self._operation == Operation.APPEND:
+            return []
+        else:
+            raise ValueError(f"Not implemented for: {self._operation}")
+
+    def _manifests(self) -> List[ManifestFile]:
+        manifests = []
+        deleted_entries = self._deleted_entries()
+
+        if self._added_data_files:
+            output_file_location = 
_new_manifest_path(location=self._table.location(), num=0, 
commit_uuid=self._commit_uuid)
+            with write_manifest(
+                format_version=self._table.format_version,
+                spec=self._table.spec(),
+                schema=self._table.schema(),
+                output_file=self._table.io.new_output(output_file_location),
+                snapshot_id=self._snapshot_id,
+            ) as writer:
+                for data_file in self._added_data_files:
+                    writer.add_entry(
+                        ManifestEntry(
+                            status=ManifestEntryStatus.ADDED,
+                            snapshot_id=self._snapshot_id,
+                            data_sequence_number=None,
+                            file_sequence_number=None,
+                            data_file=data_file,
+                        )
+                    )
+
+                for delete_entry in deleted_entries:
+                    writer.add_entry(delete_entry)
+
+            manifests.append(writer.to_manifest_file())
+
+        return manifests
+
+    def _summary(self) -> Dict[str, str]:
+        ssc = SnapshotSummaryCollector()
+
+        for data_file in self._added_data_files:
+            ssc.add_file(data_file=data_file)
+
+        return ssc.build()
+
+    def commit(self) -> Snapshot:
+        new_manifests = self._manifests()
+        next_sequence_number = self._table.next_sequence_number()
+
+        previous_snapshot = 
self._table.snapshot_by_id(self._parent_snapshot_id) if 
self._parent_snapshot_id is not None else None
+        summary = update_snapshot_summaries(
+            summary=Summary(operation=self._operation, **self._summary()),
+            previous_summary=previous_snapshot.summary if previous_snapshot is 
not None else None,
+            truncate_full_table=self._operation == Operation.OVERWRITE,
+        )
+
+        manifest_list_file_path = _generate_manifest_list_path(
+            location=self._table.location(), snapshot_id=self._snapshot_id, 
attempt=0, commit_uuid=self._commit_uuid
+        )
+        with write_manifest_list(
+            format_version=self._table.metadata.format_version,
+            output_file=self._table.io.new_output(manifest_list_file_path),
+            snapshot_id=self._snapshot_id,
+            parent_snapshot_id=self._parent_snapshot_id,
+            sequence_number=next_sequence_number,
+        ) as writer:
+            if self._operation == Operation.APPEND and previous_snapshot is 
not None:
+                # In case we want to append, just add the existing manifests
+                
writer.add_manifests(previous_snapshot.manifests(io=self._table.io))
+            writer.add_manifests(new_manifests)

Review Comment:
   Similar to the note above, I think it would be cleaner to have `_manifests` 
produce the complete set of manifests, not just the replacement ones. That 
method already relies on `_deleted_entries` to produce deletes, so it may as 
well also be responsible for checking whether to include the existing manifests.
   
   Another option is to make `_manifests` produce just manifests for the 
appended files and handle deletes separately, but it looks like your approach 
here is to create just one manifest with both deletes and appends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to