rdblue commented on code in PR #6490: URL: https://github.com/apache/iceberg/pull/6490#discussion_r1059524353
########## python/pyiceberg/manifest.py: ########## @@ -76,137 +66,283 @@ def __repr__(self) -> str: return f"FileFormat.{self.name}" -class DataFile(IcebergBaseModel): - content: DataFileContent = Field(default=DataFileContent.DATA) - file_path: str = Field() - file_format: FileFormat = Field() - partition: Dict[str, Any] = Field() - record_count: int = Field() - file_size_in_bytes: int = Field() - block_size_in_bytes: Optional[int] = Field() - column_sizes: Optional[Dict[int, int]] = Field() - value_counts: Optional[Dict[int, int]] = Field() - null_value_counts: Optional[Dict[int, int]] = Field() - nan_value_counts: Optional[Dict[int, int]] = Field() - distinct_counts: Optional[Dict[int, int]] = Field() - lower_bounds: Optional[Dict[int, bytes]] = Field() - upper_bounds: Optional[Dict[int, bytes]] = Field() - key_metadata: Optional[bytes] = Field() - split_offsets: Optional[List[int]] = Field() - equality_ids: Optional[List[int]] = Field() - sort_order_id: Optional[int] = Field() - - -class ManifestEntry(IcebergBaseModel): - status: ManifestEntryStatus = Field() - snapshot_id: Optional[int] = Field() - sequence_number: Optional[int] = Field() - data_file: DataFile = Field() - - -class PartitionFieldSummary(IcebergBaseModel): - contains_null: bool = Field() - contains_nan: Optional[bool] = Field() - lower_bound: Optional[bytes] = Field() - upper_bound: Optional[bytes] = Field() - - -class ManifestFile(IcebergBaseModel): - manifest_path: str = Field() - manifest_length: int = Field() - partition_spec_id: int = Field() - content: ManifestContent = Field(default=ManifestContent.DATA) - sequence_number: int = Field(default=0) - min_sequence_number: int = Field(default=0) - added_snapshot_id: Optional[int] = Field() - added_data_files_count: Optional[int] = Field() - existing_data_files_count: Optional[int] = Field() - deleted_data_files_count: Optional[int] = Field() - added_rows_count: Optional[int] = Field() - existing_rows_counts: Optional[int] = Field() - deleted_rows_count: Optional[int] = Field() - partitions: Optional[List[PartitionFieldSummary]] = Field() - key_metadata: Optional[bytes] = Field() - - def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]: +class DataFile(Record): + @staticmethod + def from_record(record: Record, format_version: int) -> Union[DataFileV1, DataFileV2]: + if format_version == 1: + return DataFileV1(*record) + elif format_version == 2: + return DataFileV2(*record) + else: + raise ValueError(f"Unknown format-version: {format_version}") + + file_path: str + file_format: FileFormat + partition: Record + record_count: int + file_size_in_bytes: int + block_size_in_bytes: Optional[int] = None + column_sizes: Optional[Dict[int, int]] = None + value_counts: Optional[Dict[int, int]] = None + null_value_counts: Optional[Dict[int, int]] = None + nan_value_counts: Optional[Dict[int, int]] = None + distinct_counts: Optional[Dict[int, int]] = None # Does not seem to be used on the Java side!? + lower_bounds: Optional[Dict[int, bytes]] = None + upper_bounds: Optional[Dict[int, bytes]] = None + key_metadata: Optional[bytes] = None + split_offsets: Optional[List[int]] = None + equality_ids: Optional[List[int]] = None + sort_order_id: Optional[int] = None + content: DataFileContent = DataFileContent.DATA + + +class DataFileV1(DataFile): + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.file_path = value + elif pos == 1: + self.file_format = value + elif pos == 2: + self.partition = value + elif pos == 3: + self.record_count = value + elif pos == 4: + self.file_size_in_bytes = value + elif pos == 5: + self.block_size_in_bytes = value + elif pos == 6: + self.column_sizes = value + elif pos == 7: + self.value_counts = value + elif pos == 8: + self.null_value_counts = value + elif pos == 9: + self.nan_value_counts = value + elif pos == 10: + self.lower_bounds = value + elif pos == 11: + self.upper_bounds = value + elif pos == 12: + self.key_metadata = value + elif pos == 13: + self.split_offsets = value + elif pos == 14: + self.sort_order_id = value + + +class DataFileV2(DataFile): + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.content = value + elif pos == 1: + self.file_path = value + elif pos == 2: + self.file_format = value + elif pos == 3: + self.partition = value + elif pos == 4: + self.record_count = value + elif pos == 5: + self.file_size_in_bytes = value + elif pos == 6: + self.column_sizes = value + elif pos == 7: + self.value_counts = value + elif pos == 8: + self.null_value_counts = value + elif pos == 9: + self.nan_value_counts = value + elif pos == 10: + self.lower_bounds = value + elif pos == 11: + self.upper_bounds = value + elif pos == 12: + self.key_metadata = value + elif pos == 13: + self.split_offsets = value + elif pos == 14: + self.equality_ids = value + elif pos == 15: + self.sort_order_id = value + + +class ManifestEntry(Record): + status: ManifestEntryStatus + data_file: DataFile + snapshot_id: Optional[int] = None + sequence_number: Optional[int] = None + file_sequence_number: Optional[int] = None + + @staticmethod + def from_record(record: Record, format_version: int) -> Union[ManifestEntryV1, ManifestEntryV2]: + if format_version == 1: + return ManifestEntryV1(*record) + elif format_version == 2: + return ManifestEntryV2(*record) + else: + raise ValueError(f"Unknown format-version: {format_version}") + + +class ManifestEntryV1(ManifestEntry): + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.status = ManifestEntryStatus(value) + elif pos == 1: + self.snapshot_id = value + elif pos == 2: + self.data_file = DataFile.from_record(value, format_version=1) + + +class ManifestEntryV2(ManifestEntry): + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.status = ManifestEntryStatus(value) + elif pos == 1: + self.snapshot_id = value + elif pos == 2: + self.sequence_number = value + elif pos == 3: + self.file_sequence_number = value + elif pos == 4: + self.data_file = DataFile.from_record(value, format_version=2) + + +class PartitionFieldSummary(Record): + @staticmethod + def from_record(record: Record) -> PartitionFieldSummary: + return PartitionFieldSummary(*record) + + contains_null: bool + contains_nan: Optional[bool] + lower_bound: Optional[bytes] + upper_bound: Optional[bytes] + + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.contains_null = value + elif pos == 1: + self.contains_nan = value + elif pos == 2: + self.lower_bound = value + elif pos == 3: + self.upper_bound = value + # ignore the object, it must be from a newer version of the format + + +class ManifestFile(Record): + @staticmethod + def from_record(record: Record, format_version: int) -> ManifestFile: + if format_version == 1: + return ManifestFileV1(*record) + elif format_version == 2: + return ManifestFileV2(*record) + else: + raise ValueError(f"Unknown format-version: {format_version}") + + manifest_path: str + manifest_length: int + partition_spec_id: int + added_snapshot_id: int # v2 + content: ManifestContent = ManifestContent.DATA # v2 + sequence_number: Optional[int] = None # v2 + min_sequence_number: Optional[int] = None # v2 + added_files_count: Optional[int] = None + existing_files_count: Optional[int] = None + deleted_files_count: Optional[int] = None + added_rows_count: Optional[int] = None + existing_rows_count: Optional[int] = None + deleted_rows_count: Optional[int] = None + partitions: Optional[List[PartitionFieldSummary]] = None + key_metadata: Optional[bytes] = None + + def fetch_manifest_entry(self, io: FileIO, format_version: int) -> List[ManifestEntry]: file = io.new_input(self.manifest_path) - return list(read_manifest_entry(file)) - - -def read_manifest_entry(input_file: InputFile) -> Iterator[ManifestEntry]: + return read_manifest_entry(file, format_version) + + +class ManifestFileV1(ManifestFile): + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.manifest_path = value + elif pos == 1: + self.manifest_length = value + elif pos == 2: + self.partition_spec_id = value + elif pos == 3: + self.added_snapshot_id = value + elif pos == 4: + self.added_files_count = value + elif pos == 5: + self.existing_files_count = value + elif pos == 6: + self.deleted_files_count = value + elif pos == 7: + self.partitions = [ + element if isinstance(element, PartitionFieldSummary) else PartitionFieldSummary(*element) for element in value + ] + elif pos == 8: + self.added_rows_count = value + elif pos == 9: + self.existing_rows_count = value + elif pos == 10: + self.deleted_rows_count = value + # ignore the object, it must be from a newer version of the format + + +class ManifestFileV2(ManifestFile): + def __setitem__(self, pos: int, value: Any) -> None: + if pos == 0: + self.manifest_path = value + elif pos == 1: + self.manifest_length = value + elif pos == 2: + self.partition_spec_id = value + elif pos == 3: + self.content = ManifestContent(value) + elif pos == 4: + self.sequence_number = value + elif pos == 5: + self.min_sequence_number = value + elif pos == 6: + self.added_snapshot_id = value + elif pos == 7: + self.added_files_count = value + elif pos == 8: + self.existing_files_count = value + elif pos == 9: + self.deleted_files_count = value + elif pos == 10: + self.added_rows_count = value + elif pos == 11: + self.existing_rows_count = value + elif pos == 12: + self.deleted_rows_count = value + elif pos == 13: + self.partitions = value + elif pos == 14: + self.key_metadata = value + # ignore the object, it must be from a newer version of the format + + +def read_manifest_entry(input_file: InputFile, format_version: int) -> List[ManifestEntry]: with AvroFile(input_file) as reader: - schema = reader.schema - for record in reader: - dict_repr = _convert_pos_to_dict(schema, record) - yield ManifestEntry(**dict_repr) + return [ManifestEntry.from_record(record, format_version) for record in reader] Review Comment: Rather than passing the manifest version down everywhere, I think this should project based on the file schema and a requested schema. All metadata files are maintained so you can always read based on the latest schema (just like any table data file). Plus we want to be able to handle projection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org