Fokko commented on code in PR #41: URL: https://github.com/apache/iceberg-python/pull/41#discussion_r1436371090
########## pyiceberg/table/__init__.py: ########## @@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_datafile_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: + return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: + # Mimics the behavior in Java: + # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: + from pyiceberg.io.pyarrow import write_file + + write_uuid = uuid.uuid4() + counter = itertools.count(0) + + # This is an iter, so we don't have to materialize everything every time + # This will be more relevant when we start doing partitioned writes + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: + _operation: Operation + _table: Table + _snapshot_id: int + _parent_snapshot_id: Optional[int] + _added_datafiles: List[DataFile] + _existing_datafiles: List[DataFile] + _commit_uuid: uuid.UUID + + def __init__(self, operation: Operation, table: Table, snapshot_id: int, parent_snapshot_id: Optional[int]) -> None: + self._operation = operation + self._table = table + self._snapshot_id = snapshot_id + self._parent_snapshot_id = parent_snapshot_id Review Comment: Hey, I like that! I've removed the `_parent_snapshot_id` as an argument. We might want to discuss how we would make new branches, but I agree that it is better to do that in a separate PR since there is enough going on there :) ########## pyiceberg/table/__init__.py: ########## @@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_datafile_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: + return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: + # Mimics the behavior in Java: + # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: + from pyiceberg.io.pyarrow import write_file + + write_uuid = uuid.uuid4() + counter = itertools.count(0) + + # This is an iter, so we don't have to materialize everything every time + # This will be more relevant when we start doing partitioned writes + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: + _operation: Operation + _table: Table + _snapshot_id: int + _parent_snapshot_id: Optional[int] + _added_datafiles: List[DataFile] + _existing_datafiles: List[DataFile] + _commit_uuid: uuid.UUID + + def __init__(self, operation: Operation, table: Table, snapshot_id: int, parent_snapshot_id: Optional[int]) -> None: + self._operation = operation + self._table = table + self._snapshot_id = snapshot_id + self._parent_snapshot_id = parent_snapshot_id + self._added_datafiles = [] + self._existing_datafiles = [] + self._commit_uuid = uuid.uuid4() + + def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: + if added: + self._added_datafiles.append(data_file) + else: + self._existing_datafiles.append(data_file) + return self + + def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + """Rewrites a manifest file with a new snapshot-id. + + Args: + manifest_file: The existing manifest file + + Returns: + New manifest file with the current snapshot-id + """ + output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) + with write_manifest( + format_version=self._table.format_version, + spec=self._table.specs()[manifest_file.partition_spec_id], + schema=self._table.schema(), + output_file=self._table.io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for entry in manifest_file.fetch_manifest_entry(self._table.io, discard_deleted=True): + writer.add_entry(entry) + + return writer.to_manifest_file() + + def _manifests(self) -> Tuple[Dict[str, str], List[ManifestFile]]: + ssc = SnapshotSummaryCollector() + manifests = [] + + if self._added_datafiles: + output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid) + with write_manifest( + format_version=self._table.format_version, + spec=self._table.spec(), + schema=self._table.schema(), + output_file=self._table.io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for data_file in self._added_datafiles + self._existing_datafiles: + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.ADDED, + snapshot_id=self._snapshot_id, + data_sequence_number=None, + file_sequence_number=None, + data_file=data_file, + ) + ) + + for data_file in self._added_datafiles: + ssc.add_file(data_file=data_file) + + manifests.append(writer.to_manifest_file()) + + return ssc.build(), manifests + + def commit(self) -> Snapshot: + new_summary, manifests = self._manifests() + + previous_summary = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None Review Comment: Yes, that's correct! I've refactored this a couple of times, so the name is not up to date anymore. Thanks for catching this 👍 ########## pyiceberg/table/__init__.py: ########## @@ -1904,3 +2004,158 @@ def _generate_snapshot_id() -> int: snapshot_id = snapshot_id if snapshot_id >= 0 else snapshot_id * -1 return snapshot_id + + +@dataclass(frozen=True) +class WriteTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_datafile_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + +def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: + return f'{location}/metadata/{commit_uuid}-m{num}.avro' + + +def _generate_manifest_list_filename(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: + # Mimics the behavior in Java: + # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + + +def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]: + from pyiceberg.io.pyarrow import write_file + + write_uuid = uuid.uuid4() + counter = itertools.count(0) + + # This is an iter, so we don't have to materialize everything every time + # This will be more relevant when we start doing partitioned writes + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)])) + + +class _MergeAppend: + _operation: Operation + _table: Table + _snapshot_id: int + _parent_snapshot_id: Optional[int] + _added_datafiles: List[DataFile] + _existing_datafiles: List[DataFile] + _commit_uuid: uuid.UUID + + def __init__(self, operation: Operation, table: Table, snapshot_id: int, parent_snapshot_id: Optional[int]) -> None: + self._operation = operation + self._table = table + self._snapshot_id = snapshot_id + self._parent_snapshot_id = parent_snapshot_id + self._added_datafiles = [] + self._existing_datafiles = [] + self._commit_uuid = uuid.uuid4() + + def append_datafile(self, data_file: DataFile, added: bool = True) -> _MergeAppend: + if added: + self._added_datafiles.append(data_file) + else: + self._existing_datafiles.append(data_file) + return self + + def _copy_manifest(self, manifest_file: ManifestFile) -> ManifestFile: Review Comment: Yes, this one should go. It is used for fast appends, but they caused a lot issues on the Java side, therefore I think we should stick (for now at least) with just rewriting the manifests. ########## pyiceberg/io/pyarrow.py: ########## @@ -1447,18 +1452,15 @@ def parquet_path_to_id_mapping( def fill_parquet_file_metadata( - df: DataFile, + datafile: DataFile, Review Comment: Yes, that's an excellent suggestion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org