HonahX commented on code in PR #41:
URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1435031878
##########
pyiceberg/table/__init__.py:
##########
@@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int:
snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+ write_uuid: uuid.UUID
+ task_id: int
+ df: pa.Table
+ sort_order_id: Optional[int] = None
+
+ # Later to be extended with partition information
+
+ def generate_datafile_filename(self, extension: str) -> str:
+ # Mimics the behavior in the Java API:
+ #
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+ return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+ return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int,
commit_uuid: uuid.UUID) -> str:
+ # Mimics the behavior in Java:
+ #
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+ return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+ from pyiceberg.io.pyarrow import write_file
+
+ write_uuid = uuid.uuid4()
+ counter = itertools.count(0)
+
+ # This is an iter, so we don't have to materialize everything every time
+ # This will be more relevant when we start doing partitioned writes
+ yield from write_file(table, iter([WriteTask(write_uuid, next(counter),
df)]))
+
+
+class _MergeAppend:
+ _operation: Operation
+ _table: Table
+ _snapshot_id: int
+ _parent_snapshot_id: Optional[int]
+ _added_datafiles: List[DataFile]
+ _existing_datafiles: List[DataFile]
+ _commit_uuid: uuid.UUID
+
+ def __init__(self, operation: Operation, table: Table, snapshot_id: int,
parent_snapshot_id: Optional[int]) -> None:
+ self._operation = operation
+ self._table = table
+ self._snapshot_id = snapshot_id
+ self._parent_snapshot_id = parent_snapshot_id
Review Comment:
Instead of passing `parent_snapshot_id` as an argument, I wonder if we could
replace it by a `branch_name: str = "main"` :
```python
def __init__(self, operation: Operation, table: Table, snapshot_id: int,
branch_name: str = "main") -> None:
...
self._parent_snapshot = self._table.current_snapshot() # only support main
in the first version
self._parent_snapshot_id = self._parent_snapshot.snapshot_id if
self._parent_snapshot else None
```
In this way we only do `snapshot_by_id` once and can avoid duplicate code
and None check in other places.
When adding branch support later, we could replace
`self._table.current_snapshot()` by something similar to
https://github.com/apache/iceberg/blob/a4d47567e1fef44f4443250537f09dc73a1f7583/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L493-L503
Does this sound reasonable to you? May be this can be discussed later in the
PR for branch support.
##########
pyiceberg/table/__init__.py:
##########
@@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int:
snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+ write_uuid: uuid.UUID
+ task_id: int
+ df: pa.Table
+ sort_order_id: Optional[int] = None
+
+ # Later to be extended with partition information
+
+ def generate_datafile_filename(self, extension: str) -> str:
+ # Mimics the behavior in the Java API:
+ #
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+ return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+ return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int,
commit_uuid: uuid.UUID) -> str:
+ # Mimics the behavior in Java:
+ #
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+ return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+ from pyiceberg.io.pyarrow import write_file
+
+ write_uuid = uuid.uuid4()
+ counter = itertools.count(0)
+
+ # This is an iter, so we don't have to materialize everything every time
+ # This will be more relevant when we start doing partitioned writes
+ yield from write_file(table, iter([WriteTask(write_uuid, next(counter),
df)]))
+
+
+class _MergeAppend:
+ _operation: Operation
+ _table: Table
+ _snapshot_id: int
+ _parent_snapshot_id: Optional[int]
+ _added_datafiles: List[DataFile]
+ _existing_datafiles: List[DataFile]
+ _commit_uuid: uuid.UUID
+
+ def __init__(self, operation: Operation, table: Table, snapshot_id: int,
parent_snapshot_id: Optional[int]) -> None:
+ self._operation = operation
+ self._table = table
+ self._snapshot_id = snapshot_id
+ self._parent_snapshot_id = parent_snapshot_id
+ self._added_datafiles = []
+ self._existing_datafiles = []
+ self._commit_uuid = uuid.uuid4()
+
+ def append_datafile(self, data_file: DataFile, added: bool = True) ->
_MergeAppend:
+ if added:
+ self._added_datafiles.append(data_file)
+ else:
+ self._existing_datafiles.append(data_file)
+ return self
+
+ def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
Review Comment:
Looks like this is not used. Out of curiosity, will there be any use case of
this in the future? I think we choose to append existing data files instead of
copying existing manifest files
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1447,18 +1452,15 @@ def parquet_path_to_id_mapping(
def fill_parquet_file_metadata(
- df: DataFile,
+ datafile: DataFile,
Review Comment:
```suggestion
data_file: DataFile,
```
Shall we rename this to `data_file`?
##########
pyiceberg/table/__init__.py:
##########
@@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int:
snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1
return snapshot_id
+
+
+@dataclass(frozen=True)
+class WriteTask:
+ write_uuid: uuid.UUID
+ task_id: int
+ df: pa.Table
+ sort_order_id: Optional[int] = None
+
+ # Later to be extended with partition information
+
+ def generate_datafile_filename(self, extension: str) -> str:
+ # Mimics the behavior in the Java API:
+ #
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
+ return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+
+
+def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
+ return f'{location}/metadata/{commit_uuid}-m{num}.avro'
+
+
+def _generate_manifest_list_filename(snapshot_id: int, attempt: int,
commit_uuid: uuid.UUID) -> str:
+ # Mimics the behavior in Java:
+ #
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+ return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
+
+
+def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
+ from pyiceberg.io.pyarrow import write_file
+
+ write_uuid = uuid.uuid4()
+ counter = itertools.count(0)
+
+ # This is an iter, so we don't have to materialize everything every time
+ # This will be more relevant when we start doing partitioned writes
+ yield from write_file(table, iter([WriteTask(write_uuid, next(counter),
df)]))
+
+
+class _MergeAppend:
+ _operation: Operation
+ _table: Table
+ _snapshot_id: int
+ _parent_snapshot_id: Optional[int]
+ _added_datafiles: List[DataFile]
+ _existing_datafiles: List[DataFile]
+ _commit_uuid: uuid.UUID
+
+ def __init__(self, operation: Operation, table: Table, snapshot_id: int,
parent_snapshot_id: Optional[int]) -> None:
+ self._operation = operation
+ self._table = table
+ self._snapshot_id = snapshot_id
+ self._parent_snapshot_id = parent_snapshot_id
+ self._added_datafiles = []
+ self._existing_datafiles = []
+ self._commit_uuid = uuid.uuid4()
+
+ def append_datafile(self, data_file: DataFile, added: bool = True) ->
_MergeAppend:
+ if added:
+ self._added_datafiles.append(data_file)
+ else:
+ self._existing_datafiles.append(data_file)
+ return self
+
+ def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+ """Rewrites a manifest file with a new snapshot-id.
+
+ Args:
+ manifest_file: The existing manifest file
+
+ Returns:
+ New manifest file with the current snapshot-id
+ """
+ output_file_location =
_new_manifest_path(location=self._table.location(), num=0,
commit_uuid=self._commit_uuid)
+ with write_manifest(
+ format_version=self._table.format_version,
+ spec=self._table.specs()[manifest_file.partition_spec_id],
+ schema=self._table.schema(),
+ output_file=self._table.io.new_output(output_file_location),
+ snapshot_id=self._snapshot_id,
+ ) as writer:
+ for entry in manifest_file.fetch_manifest_entry(self._table.io,
discard_deleted=True):
+ writer.add_entry(entry)
+
+ return writer.to_manifest_file()
+
+ def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]:
+ ssc = SnapshotSummaryCollector()
+ manifests = []
+
+ if self._added_datafiles:
+ output_file_location =
_new_manifest_path(location=self._table.location(), num=0,
commit_uuid=self._commit_uuid)
+ with write_manifest(
+ format_version=self._table.format_version,
+ spec=self._table.spec(),
+ schema=self._table.schema(),
+ output_file=self._table.io.new_output(output_file_location),
+ snapshot_id=self._snapshot_id,
+ ) as writer:
+ for data_file in self._added_datafiles +
self._existing_datafiles:
+ writer.add_entry(
+ ManifestEntry(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=self._snapshot_id,
+ data_sequence_number=None,
+ file_sequence_number=None,
+ data_file=data_file,
+ )
+ )
+
+ for data_file in self._added_datafiles:
+ ssc.add_file(data_file=data_file)
+
+ manifests.append(writer.to_manifest_file())
+
+ return ssc.build(), manifests
+
+ def commit(self) -> Snapshot:
+ new_summary, manifests = self._manifests()
+
+ previous_summary =
self._table.snapshot_by_id(self._parent_snapshot_id) if
self._parent_snapshot_id is not None else None
Review Comment:
```suggestion
previous_snapshot =
self._table.snapshot_by_id(self._parent_snapshot_id) if
self._parent_snapshot_id is not None else None
```
Although this is only for extracting the previous summary, I think it might
be good to use `_snapshot` as its name to be clear
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]