Fokko commented on code in PR #8012:
URL: https://github.com/apache/iceberg/pull/8012#discussion_r1257964206


##########
python/pyiceberg/manifest.py:
##########
@@ -363,3 +459,232 @@ def _inherit_sequence_number(entry: ManifestEntry, 
manifest: ManifestFile) -> Ma
         entry.file_sequence_number = manifest.sequence_number
 
     return entry
+
+
+class ManifestWriter(ABC):
+    closed: bool
+    _spec: PartitionSpec
+    _output_file: OutputFile
+    _writer: AvroOutputFile[ManifestEntry]
+    _snapshot_id: int
+    _meta: Dict[str, str]
+    _added_files: int
+    _added_rows: int
+    _existing_files: int
+    _existing_rows: int
+    _deleted_files: int
+    _deleted_rows: int
+    _min_data_sequence_number: Optional[int]
+    _partition_summary: PartitionSummary
+
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int, meta: Dict[str, str]):
+        self.closed = False
+        self._spec = spec
+        self._output_file = output_file
+        self._snapshot_id = snapshot_id
+        self._meta = meta
+
+        self._added_files = 0
+        self._added_rows = 0
+        self._existing_files = 0
+        self._existing_rows = 0
+        self._deleted_files = 0
+        self._deleted_rows = 0
+        self._min_data_sequence_number = None
+        self._partition_summary = PartitionSummary(spec, schema)
+        self._manifest_entry_schema = 
manifest_entry_schema(data_file_type(spec.partition_type(schema)))
+
+    def __enter__(self) -> ManifestWriter:
+        """Opens the writer."""
+        self._writer = AvroOutputFile[ManifestEntry](self._output_file, 
self._manifest_entry_schema, "manifest_entry", self._meta)
+        self._writer.__enter__()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_value: Optional[BaseException],
+        traceback: Optional[TracebackType],
+    ) -> None:
+        """Closes the writer."""
+        self.closed = True
+        self._writer.__exit__(exc_type, exc_value, traceback)
+
+    @abstractmethod
+    def content(self) -> ManifestContent:
+        ...
+
+    def to_manifest_file(self) -> ManifestFile:
+        """Returns the manifest file."""
+        # once the manifest file is generated, no more entries can be added
+        self.closed = True
+        min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
+        return ManifestFile(
+            manifest_path=self._output_file.location,
+            manifest_length=len(self._writer.output_file),
+            partition_spec_id=self._spec.spec_id,
+            content=self.content(),
+            sequence_number=UNASSIGNED_SEQ,
+            min_sequence_number=min_sequence_number,
+            added_snapshot_id=self._snapshot_id,
+            added_files_count=self._added_files,
+            existing_files_count=self._existing_files,
+            deleted_files_count=self._deleted_files,
+            added_rows_count=self._added_rows,
+            existing_rows_count=self._existing_rows,
+            deleted_rows_count=self._deleted_rows,
+            partitions=self._partition_summary.summaries(),
+            key_metadatas=None,
+        )
+
+    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
+        if self.closed:
+            raise RuntimeError("Cannot add entry to closed manifest writer")
+        if entry.status == ManifestEntryStatus.ADDED:
+            self._added_files += 1
+            self._added_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.EXISTING:
+            self._existing_files += 1
+            self._existing_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.DELETED:
+            self._deleted_files += 1
+            self._deleted_rows += entry.data_file.record_count
+
+        self._partition_summary.update(entry.data_file.partition)
+
+        if (
+            (entry.status == ManifestEntryStatus.ADDED or entry.status == 
ManifestEntryStatus.EXISTING)
+            and entry.data_sequence_number is not None
+            and (self._min_data_sequence_number is None or 
entry.data_sequence_number < self._min_data_sequence_number)
+        ):
+            self._min_data_sequence_number = entry.data_sequence_number
+
+        self._writer.write_block([entry])
+        return self
+
+
+class ManifestWriterV1(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {
+                "schema": schema.json(),
+                "partition-spec": spec.json(),
+                "partition-spec-id": str(spec.spec_id),
+                "format-version": "1",
+            },
+        )
+
+    def content(self) -> ManifestContent:
+        return ManifestContent.DATA
+
+
+class ManifestWriterV2(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {
+                "schema": schema.json(),
+                "partition-spec": spec.json(),
+                "partition-spec-id": str(spec.spec_id),
+                "format-version": "2",
+                "content": "data",
+            },
+        )
+
+    def content(self) -> ManifestContent:
+        return ManifestContent.DATA
+
+
+def write_manifest(
+    format_version: int, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int
+) -> ManifestWriter:
+    if format_version == 1:
+        return ManifestWriterV1(spec, schema, output_file, snapshot_id)
+    elif format_version == 2:
+        return ManifestWriterV2(spec, schema, output_file, snapshot_id)
+    else:
+        # TODO: replace it with UnsupportedOperationException

Review Comment:
   I think a `ValueError` is reasonable.



##########
python/pyiceberg/manifest.py:
##########
@@ -363,3 +459,232 @@ def _inherit_sequence_number(entry: ManifestEntry, 
manifest: ManifestFile) -> Ma
         entry.file_sequence_number = manifest.sequence_number
 
     return entry
+
+
+class ManifestWriter(ABC):
+    closed: bool
+    _spec: PartitionSpec
+    _output_file: OutputFile
+    _writer: AvroOutputFile[ManifestEntry]
+    _snapshot_id: int
+    _meta: Dict[str, str]
+    _added_files: int
+    _added_rows: int
+    _existing_files: int
+    _existing_rows: int
+    _deleted_files: int
+    _deleted_rows: int
+    _min_data_sequence_number: Optional[int]
+    _partition_summary: PartitionSummary
+
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int, meta: Dict[str, str]):
+        self.closed = False
+        self._spec = spec
+        self._output_file = output_file
+        self._snapshot_id = snapshot_id
+        self._meta = meta
+
+        self._added_files = 0
+        self._added_rows = 0
+        self._existing_files = 0
+        self._existing_rows = 0
+        self._deleted_files = 0
+        self._deleted_rows = 0
+        self._min_data_sequence_number = None
+        self._partition_summary = PartitionSummary(spec, schema)
+        self._manifest_entry_schema = 
manifest_entry_schema(data_file_type(spec.partition_type(schema)))
+
+    def __enter__(self) -> ManifestWriter:
+        """Opens the writer."""
+        self._writer = AvroOutputFile[ManifestEntry](self._output_file, 
self._manifest_entry_schema, "manifest_entry", self._meta)
+        self._writer.__enter__()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_value: Optional[BaseException],
+        traceback: Optional[TracebackType],
+    ) -> None:
+        """Closes the writer."""
+        self.closed = True
+        self._writer.__exit__(exc_type, exc_value, traceback)
+
+    @abstractmethod
+    def content(self) -> ManifestContent:
+        ...
+
+    def to_manifest_file(self) -> ManifestFile:
+        """Returns the manifest file."""
+        # once the manifest file is generated, no more entries can be added
+        self.closed = True
+        min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
+        return ManifestFile(
+            manifest_path=self._output_file.location,
+            manifest_length=len(self._writer.output_file),
+            partition_spec_id=self._spec.spec_id,
+            content=self.content(),
+            sequence_number=UNASSIGNED_SEQ,
+            min_sequence_number=min_sequence_number,
+            added_snapshot_id=self._snapshot_id,
+            added_files_count=self._added_files,
+            existing_files_count=self._existing_files,
+            deleted_files_count=self._deleted_files,
+            added_rows_count=self._added_rows,
+            existing_rows_count=self._existing_rows,
+            deleted_rows_count=self._deleted_rows,
+            partitions=self._partition_summary.summaries(),
+            key_metadatas=None,
+        )
+
+    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
+        if self.closed:
+            raise RuntimeError("Cannot add entry to closed manifest writer")
+        if entry.status == ManifestEntryStatus.ADDED:
+            self._added_files += 1
+            self._added_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.EXISTING:
+            self._existing_files += 1
+            self._existing_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.DELETED:
+            self._deleted_files += 1
+            self._deleted_rows += entry.data_file.record_count
+
+        self._partition_summary.update(entry.data_file.partition)
+
+        if (
+            (entry.status == ManifestEntryStatus.ADDED or entry.status == 
ManifestEntryStatus.EXISTING)
+            and entry.data_sequence_number is not None
+            and (self._min_data_sequence_number is None or 
entry.data_sequence_number < self._min_data_sequence_number)
+        ):
+            self._min_data_sequence_number = entry.data_sequence_number
+
+        self._writer.write_block([entry])
+        return self
+
+
+class ManifestWriterV1(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {
+                "schema": schema.json(),
+                "partition-spec": spec.json(),
+                "partition-spec-id": str(spec.spec_id),
+                "format-version": "1",
+            },
+        )
+
+    def content(self) -> ManifestContent:
+        return ManifestContent.DATA
+
+
+class ManifestWriterV2(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {
+                "schema": schema.json(),
+                "partition-spec": spec.json(),
+                "partition-spec-id": str(spec.spec_id),
+                "format-version": "2",
+                "content": "data",
+            },
+        )
+
+    def content(self) -> ManifestContent:
+        return ManifestContent.DATA
+
+
+def write_manifest(
+    format_version: int, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int

Review Comment:
   ```suggestion
       format_version: Literal[1, 2], spec: PartitionSpec, schema: Schema, 
output_file: OutputFile, snapshot_id: int
   ```



##########
python/pyiceberg/manifest.py:
##########
@@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive 
type for the partition field, got {iceberg_type}"
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> PartitionFieldStats:

Review Comment:
   Do we need to return `PartitionFieldStats`? We don't use it below.



##########
python/pyiceberg/manifest.py:
##########
@@ -78,89 +94,106 @@ def __repr__(self) -> str:
         """Returns the string representation of the FileFormat class."""
         return f"FileFormat.{self.name}"
 
-
-DATA_FILE_TYPE = StructType(
-    NestedField(
-        field_id=134,
-        name="content",
-        field_type=IntegerType(),
-        required=False,
-        doc="Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes",
-        initial_default=DataFileContent.DATA,
-    ),
-    NestedField(field_id=100, name="file_path", field_type=StringType(), 
required=True, doc="Location URI with FS scheme"),
-    NestedField(
-        field_id=101, name="file_format", field_type=StringType(), 
required=True, doc="File format name: avro, orc, or parquet"
-    ),
-    NestedField(
-        field_id=102,
-        name="partition",
-        field_type=StructType(),
-        required=True,
-        doc="Partition data tuple, schema based on the partition spec",
-    ),
-    NestedField(field_id=103, name="record_count", field_type=LongType(), 
required=True, doc="Number of records in the file"),
-    NestedField(field_id=104, name="file_size_in_bytes", 
field_type=LongType(), required=True, doc="Total file size in bytes"),
-    NestedField(
-        field_id=108,
-        name="column_sizes",
-        field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to total size on disk",
-    ),
-    NestedField(
-        field_id=109,
-        name="value_counts",
-        field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to total count, including null and NaN",
-    ),
-    NestedField(
-        field_id=110,
-        name="null_value_counts",
-        field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to null value count",
-    ),
-    NestedField(
-        field_id=137,
-        name="nan_value_counts",
-        field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to number of NaN values in the column",
-    ),
-    NestedField(
-        field_id=125,
-        name="lower_bounds",
-        field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, 
value_type=BinaryType()),
-        required=False,
-        doc="Map of column id to lower bound",
-    ),
-    NestedField(
-        field_id=128,
-        name="upper_bounds",
-        field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, 
value_type=BinaryType()),
-        required=False,
-        doc="Map of column id to upper bound",
-    ),
-    NestedField(field_id=131, name="key_metadata", field_type=BinaryType(), 
required=False, doc="Encryption key metadata blob"),
-    NestedField(
-        field_id=132,
-        name="split_offsets",
-        field_type=ListType(element_id=133, element_type=LongType(), 
element_required=True),
-        required=False,
-        doc="Splittable offsets",
-    ),
-    NestedField(
-        field_id=135,
-        name="equality_ids",
-        field_type=ListType(element_id=136, element_type=LongType(), 
element_required=True),
-        required=False,
-        doc="Equality comparison field IDs",
-    ),
-    NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), 
required=False, doc="Sort order ID"),
-    NestedField(field_id=141, name="spec_id", field_type=IntegerType(), 
required=False, doc="Partition spec ID"),
-)
+    def add_extension(self, filename: str) -> str:

Review Comment:
   Probably we want to add also the compression to it `.zstd.parquet` etc. Do 
we want to use a Literal here?
   ```suggestion
       def add_extension(self, format: Literal['parquet', 'orc', 'avro']) -> 
str:
   ```



##########
python/pyiceberg/manifest.py:
##########
@@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive 
type for the partition field, got {iceberg_type}"
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> PartitionFieldStats:
+        if value is None:
+            self._contains_null = True
+        elif math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            # TODO: may need to implement a custom comparator for incompatible 
types
+            elif value < self._min:
+                self._min = value
+            elif value > self._max:
+                self._max = value
+        return self
+
+
+class PartitionSummary:
+    _fields: List[PartitionFieldStats]
+    _types: List[IcebergType]
+
+    def __init__(self, spec: PartitionSpec, schema: Schema):
+        self._types = [field.field_type for field in 
spec.partition_type(schema).fields]
+        self._fields = [PartitionFieldStats(field_type) for field_type in 
self._types]
+
+    def summaries(self) -> List[PartitionFieldSummary]:
+        return [field.to_summary() for field in self._fields]
+
+    def update(self, partition_keys: Record) -> PartitionSummary:
+        for i, field_type in enumerate(self._types):
+            assert isinstance(field_type, PrimitiveType), f"Expected a 
primitive type for the partition field, got {field_type}"
+            partition_key = partition_keys[i]
+            self._fields[i].update(conversions.partition_to_py(field_type, 
partition_key))

Review Comment:
   I'm not sure about the `partition_to_py`:
   ```json
   {
        "name": "partition",
        "type": {
                "type": "record",
                "name": "r102",
                "fields": [{
                        "name": "tpep_pickup_datetime_day",
                        "type": ["null", {
                                "type": "int",
                                "logicalType": "date"
                        }],
                        "default": null,
                        "field-id": 1000
                }]
        },
        "field-id": 102
   }
   ```
   
   It looks like this is encoded as an int. 



##########
python/pyiceberg/manifest.py:
##########
@@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive 
type for the partition field, got {iceberg_type}"
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> PartitionFieldStats:
+        if value is None:
+            self._contains_null = True
+        elif math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            # TODO: may need to implement a custom comparator for incompatible 
types
+            elif value < self._min:

Review Comment:
   I would use Python's build in `min` and `max`



##########
python/pyiceberg/manifest.py:
##########
@@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive 
type for the partition field, got {iceberg_type}"
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> PartitionFieldStats:
+        if value is None:
+            self._contains_null = True
+        elif math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            # TODO: may need to implement a custom comparator for incompatible 
types
+            elif value < self._min:
+                self._min = value
+            elif value > self._max:
+                self._max = value
+        return self
+
+
+class PartitionSummary:
+    _fields: List[PartitionFieldStats]

Review Comment:
   `_fields` is a `List[IcebergType]` for me. Small suggestion:
   ```suggestion
       _field_stats: List[PartitionFieldStats]
   ```
   Nit: The order of the init is different than the class fields 



##########
python/pyiceberg/manifest.py:
##########
@@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive 
type for the partition field, got {iceberg_type}"
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> PartitionFieldStats:
+        if value is None:
+            self._contains_null = True
+        elif math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            # TODO: may need to implement a custom comparator for incompatible 
types
+            elif value < self._min:
+                self._min = value
+            elif value > self._max:
+                self._max = value
+        return self
+
+
+class PartitionSummary:
+    _fields: List[PartitionFieldStats]
+    _types: List[IcebergType]
+
+    def __init__(self, spec: PartitionSpec, schema: Schema):
+        self._types = [field.field_type for field in 
spec.partition_type(schema).fields]
+        self._fields = [PartitionFieldStats(field_type) for field_type in 
self._types]
+
+    def summaries(self) -> List[PartitionFieldSummary]:
+        return [field.to_summary() for field in self._fields]
+
+    def update(self, partition_keys: Record) -> PartitionSummary:
+        for i, field_type in enumerate(self._types):
+            assert isinstance(field_type, PrimitiveType), f"Expected a 
primitive type for the partition field, got {field_type}"

Review Comment:
   Can you replace this assert with a `ValueError`? We try to avoid using 
asserts outside of `tests/`



##########
python/pyiceberg/manifest.py:
##########
@@ -78,89 +94,106 @@ def __repr__(self) -> str:
         """Returns the string representation of the FileFormat class."""
         return f"FileFormat.{self.name}"
 
-
-DATA_FILE_TYPE = StructType(
-    NestedField(
-        field_id=134,
-        name="content",
-        field_type=IntegerType(),
-        required=False,
-        doc="Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes",
-        initial_default=DataFileContent.DATA,
-    ),
-    NestedField(field_id=100, name="file_path", field_type=StringType(), 
required=True, doc="Location URI with FS scheme"),
-    NestedField(
-        field_id=101, name="file_format", field_type=StringType(), 
required=True, doc="File format name: avro, orc, or parquet"
-    ),
-    NestedField(
-        field_id=102,
-        name="partition",
-        field_type=StructType(),
-        required=True,
-        doc="Partition data tuple, schema based on the partition spec",
-    ),
-    NestedField(field_id=103, name="record_count", field_type=LongType(), 
required=True, doc="Number of records in the file"),
-    NestedField(field_id=104, name="file_size_in_bytes", 
field_type=LongType(), required=True, doc="Total file size in bytes"),
-    NestedField(
-        field_id=108,
-        name="column_sizes",
-        field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to total size on disk",
-    ),
-    NestedField(
-        field_id=109,
-        name="value_counts",
-        field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to total count, including null and NaN",
-    ),
-    NestedField(
-        field_id=110,
-        name="null_value_counts",
-        field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to null value count",
-    ),
-    NestedField(
-        field_id=137,
-        name="nan_value_counts",
-        field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, 
value_type=LongType()),
-        required=False,
-        doc="Map of column id to number of NaN values in the column",
-    ),
-    NestedField(
-        field_id=125,
-        name="lower_bounds",
-        field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, 
value_type=BinaryType()),
-        required=False,
-        doc="Map of column id to lower bound",
-    ),
-    NestedField(
-        field_id=128,
-        name="upper_bounds",
-        field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, 
value_type=BinaryType()),
-        required=False,
-        doc="Map of column id to upper bound",
-    ),
-    NestedField(field_id=131, name="key_metadata", field_type=BinaryType(), 
required=False, doc="Encryption key metadata blob"),
-    NestedField(
-        field_id=132,
-        name="split_offsets",
-        field_type=ListType(element_id=133, element_type=LongType(), 
element_required=True),
-        required=False,
-        doc="Splittable offsets",
-    ),
-    NestedField(
-        field_id=135,
-        name="equality_ids",
-        field_type=ListType(element_id=136, element_type=LongType(), 
element_required=True),
-        required=False,
-        doc="Equality comparison field IDs",
-    ),
-    NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), 
required=False, doc="Sort order ID"),
-    NestedField(field_id=141, name="spec_id", field_type=IntegerType(), 
required=False, doc="Partition spec ID"),
-)
+    def add_extension(self, filename: str) -> str:
+        if filename.endswith(f".{self.name.lower()}"):
+            return filename
+        return f"{filename}.{self.name.lower()}"
+
+
+def data_file_type(partition_type: StructType) -> StructType:

Review Comment:
   I'm not a fan of this one, but I see why it is necessary. For reading, we 
can override certain field IDs:
   
https://github.com/apache/iceberg/blob/e389e4d139624a49729379acd330dd9c96187b04/python/pyiceberg/manifest.py#L319-L335
   
   We could do the same when writing. We can override field-id `102` when 
constructing the writer. WDYT?



##########
python/pyiceberg/manifest.py:
##########
@@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: IcebergType) -> None:
+        assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive 
type for the partition field, got {iceberg_type}"
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> PartitionFieldStats:
+        if value is None:
+            self._contains_null = True
+        elif math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            # TODO: may need to implement a custom comparator for incompatible 
types
+            elif value < self._min:
+                self._min = value
+            elif value > self._max:
+                self._max = value
+        return self
+
+
+class PartitionSummary:
+    _fields: List[PartitionFieldStats]
+    _types: List[IcebergType]
+
+    def __init__(self, spec: PartitionSpec, schema: Schema):
+        self._types = [field.field_type for field in 
spec.partition_type(schema).fields]
+        self._fields = [PartitionFieldStats(field_type) for field_type in 
self._types]
+
+    def summaries(self) -> List[PartitionFieldSummary]:
+        return [field.to_summary() for field in self._fields]
+
+    def update(self, partition_keys: Record) -> PartitionSummary:

Review Comment:
   More on a meta-level. Instead of this, and the class above, I would probably 
write a function to convert `PartitionSpec`'s to PartitionSummaries. I think 
that's more Python (and for me also easier to follow, but that's super personal 
of course).



##########
python/pyiceberg/manifest.py:
##########
@@ -14,31 +14,47 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# pylint: disable=redefined-outer-name,arguments-renamed,fixme
+from __future__ import annotations
+
+import math
+from abc import ABC, abstractmethod
 from enum import Enum
+from types import TracebackType
 from typing import (
     Any,
     Dict,
     Iterator,
     List,
     Optional,
+    Type,
 )
 
-from pyiceberg.avro.file import AvroFile
-from pyiceberg.io import FileIO, InputFile
+from pyiceberg import conversions
+from pyiceberg.avro.file import AvroFile, AvroOutputFile
+from pyiceberg.conversions import to_bytes
+from pyiceberg.exceptions import ValidationError
+from pyiceberg.io import FileIO, InputFile, OutputFile
+from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.typedef import Record
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
+    IcebergType,
     IntegerType,
     ListType,
     LongType,
     MapType,
     NestedField,
+    PrimitiveType,
     StringType,
     StructType,
 )
 
+# TODO: Double-check what's its purpose in java

Review Comment:
   I'm not exactly sure what you're referring to. But when writing 
ManifestEntries, the sequence number is set to `null` because when we commit, 
there is a commit conflict, then we can retry. But when retrying we don't want 
to have to rewrite the Manifest files to update the sequence number. Therefore 
they are left null when written the first time. This is called Sequence number 
inheritance: https://iceberg.apache.org/spec/#sequence-number-inheritance
   



##########
python/pyiceberg/manifest.py:
##########
@@ -363,3 +459,232 @@ def _inherit_sequence_number(entry: ManifestEntry, 
manifest: ManifestFile) -> Ma
         entry.file_sequence_number = manifest.sequence_number
 
     return entry
+
+
+class ManifestWriter(ABC):
+    closed: bool
+    _spec: PartitionSpec
+    _output_file: OutputFile
+    _writer: AvroOutputFile[ManifestEntry]
+    _snapshot_id: int
+    _meta: Dict[str, str]
+    _added_files: int
+    _added_rows: int
+    _existing_files: int
+    _existing_rows: int
+    _deleted_files: int
+    _deleted_rows: int
+    _min_data_sequence_number: Optional[int]
+    _partition_summary: PartitionSummary
+
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int, meta: Dict[str, str]):
+        self.closed = False
+        self._spec = spec
+        self._output_file = output_file
+        self._snapshot_id = snapshot_id
+        self._meta = meta
+
+        self._added_files = 0
+        self._added_rows = 0
+        self._existing_files = 0
+        self._existing_rows = 0
+        self._deleted_files = 0
+        self._deleted_rows = 0
+        self._min_data_sequence_number = None
+        self._partition_summary = PartitionSummary(spec, schema)
+        self._manifest_entry_schema = 
manifest_entry_schema(data_file_type(spec.partition_type(schema)))
+
+    def __enter__(self) -> ManifestWriter:
+        """Opens the writer."""
+        self._writer = AvroOutputFile[ManifestEntry](self._output_file, 
self._manifest_entry_schema, "manifest_entry", self._meta)
+        self._writer.__enter__()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_value: Optional[BaseException],
+        traceback: Optional[TracebackType],
+    ) -> None:
+        """Closes the writer."""
+        self.closed = True
+        self._writer.__exit__(exc_type, exc_value, traceback)
+
+    @abstractmethod
+    def content(self) -> ManifestContent:
+        ...
+
+    def to_manifest_file(self) -> ManifestFile:
+        """Returns the manifest file."""
+        # once the manifest file is generated, no more entries can be added
+        self.closed = True
+        min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
+        return ManifestFile(
+            manifest_path=self._output_file.location,
+            manifest_length=len(self._writer.output_file),
+            partition_spec_id=self._spec.spec_id,
+            content=self.content(),
+            sequence_number=UNASSIGNED_SEQ,
+            min_sequence_number=min_sequence_number,
+            added_snapshot_id=self._snapshot_id,
+            added_files_count=self._added_files,
+            existing_files_count=self._existing_files,
+            deleted_files_count=self._deleted_files,
+            added_rows_count=self._added_rows,
+            existing_rows_count=self._existing_rows,
+            deleted_rows_count=self._deleted_rows,
+            partitions=self._partition_summary.summaries(),
+            key_metadatas=None,
+        )
+
+    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
+        if self.closed:
+            raise RuntimeError("Cannot add entry to closed manifest writer")
+        if entry.status == ManifestEntryStatus.ADDED:
+            self._added_files += 1
+            self._added_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.EXISTING:
+            self._existing_files += 1
+            self._existing_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.DELETED:
+            self._deleted_files += 1
+            self._deleted_rows += entry.data_file.record_count
+
+        self._partition_summary.update(entry.data_file.partition)
+
+        if (
+            (entry.status == ManifestEntryStatus.ADDED or entry.status == 
ManifestEntryStatus.EXISTING)
+            and entry.data_sequence_number is not None
+            and (self._min_data_sequence_number is None or 
entry.data_sequence_number < self._min_data_sequence_number)
+        ):
+            self._min_data_sequence_number = entry.data_sequence_number
+
+        self._writer.write_block([entry])
+        return self
+
+
+class ManifestWriterV1(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {

Review Comment:
   More of a style thing, but I would prefer named arguments here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to