kevinjqliu commented on code in PR #1608: URL: https://github.com/apache/iceberg-python/pull/1608#discussion_r1947916711
########## tests/integration/test_inspect_table.py: ########## @@ -938,3 +938,111 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_all_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + assert df.column_names == [ + "status", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file", + "readable_metrics", + ] + + # Make sure that they are filled properly Review Comment: nit: add a comment to signal that this checks the first 4 columns. and the rest of the tests check for the last 2 `data_file` and `readable_metrics` ########## pyiceberg/table/inspect.py: ########## @@ -157,74 +158,92 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return entries_schema + def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + + schema = self.tbl.metadata.schema() Review Comment: something im concerned about with the `all_*` metadata tables is taking into account the table schema/partition evolution. since we're looking across all snapshots, it might not be right to use the current snapshot here ########## pyiceberg/table/inspect.py: ########## @@ -157,74 +158,92 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return entries_schema + def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + + schema = self.tbl.metadata.schema() + entries_schema = self._get_entries_schema() entries = [] - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io): - column_sizes = entry.data_file.column_sizes or {} - value_counts = entry.data_file.value_counts or {} - null_value_counts = entry.data_file.null_value_counts or {} - nan_value_counts = entry.data_file.nan_value_counts or {} - lower_bounds = entry.data_file.lower_bounds or {} - upper_bounds = entry.data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): + column_sizes = entry.data_file.column_sizes or {} + value_counts = entry.data_file.value_counts or {} + null_value_counts = entry.data_file.null_value_counts or {} + nan_value_counts = entry.data_file.nan_value_counts or {} + lower_bounds = entry.data_file.lower_bounds or {} + upper_bounds = entry.data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + # Makes them readable + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, } + for field in self.tbl.metadata.schema().fields + } - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } - entries.append( - { - "status": entry.status.value, - "snapshot_id": entry.snapshot_id, - "sequence_number": entry.sequence_number, - "file_sequence_number": entry.file_sequence_number, - "data_file": { - "content": entry.data_file.content, - "file_path": entry.data_file.file_path, - "file_format": entry.data_file.file_format, - "partition": partition_record_dict, - "record_count": entry.data_file.record_count, - "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes), - "value_counts": dict(entry.data_file.value_counts), - "null_value_counts": dict(entry.data_file.null_value_counts), - "nan_value_counts": dict(entry.data_file.nan_value_counts), - "lower_bounds": entry.data_file.lower_bounds, - "upper_bounds": entry.data_file.upper_bounds, - "key_metadata": entry.data_file.key_metadata, - "split_offsets": entry.data_file.split_offsets, - "equality_ids": entry.data_file.equality_ids, - "sort_order_id": entry.data_file.sort_order_id, - "spec_id": entry.data_file.spec_id, - }, - "readable_metrics": readable_metrics, - } - ) + entries.append( + { + "status": entry.status.value, + "snapshot_id": entry.snapshot_id, + "sequence_number": entry.sequence_number, + "file_sequence_number": entry.file_sequence_number, + "data_file": { + "content": entry.data_file.content, + "file_path": entry.data_file.file_path, + "file_format": entry.data_file.file_format, + "partition": partition_record_dict, + "record_count": entry.data_file.record_count, + "file_size_in_bytes": entry.data_file.file_size_in_bytes, + "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, Review Comment: how about ``` "column_sizes": dict(entry.data_file.column_sizes) or None ``` ########## tests/integration/test_inspect_table.py: ########## @@ -938,3 +938,111 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_all_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + assert df.column_names == [ + "status", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file", + "readable_metrics", + ] + + # Make sure that they are filled properly + for int_column in ["status", "snapshot_id", "sequence_number", "file_sequence_number"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for snapshot_id in df["snapshot_id"]: + assert isinstance(snapshot_id.as_py(), int) + + lhs = df.to_pandas() + lhs["content"] = lhs["data_file"].apply(lambda x: x.get("content")) + lhs["file_path"] = lhs["data_file"].apply(lambda x: x.get("file_path")) + lhs = lhs.sort_values(["status", "snapshot_id", "sequence_number", "content", "file_path"]).drop( + columns=["file_path", "content"] + ) Review Comment: nit: maybe inline these operation to show that the same operations are performed on lhs and rhs ########## pyiceberg/table/inspect.py: ########## @@ -157,74 +158,92 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return entries_schema + def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + + schema = self.tbl.metadata.schema() + entries_schema = self._get_entries_schema() entries = [] - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io): - column_sizes = entry.data_file.column_sizes or {} - value_counts = entry.data_file.value_counts or {} - null_value_counts = entry.data_file.null_value_counts or {} - nan_value_counts = entry.data_file.nan_value_counts or {} - lower_bounds = entry.data_file.lower_bounds or {} - upper_bounds = entry.data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): + column_sizes = entry.data_file.column_sizes or {} + value_counts = entry.data_file.value_counts or {} + null_value_counts = entry.data_file.null_value_counts or {} + nan_value_counts = entry.data_file.nan_value_counts or {} + lower_bounds = entry.data_file.lower_bounds or {} + upper_bounds = entry.data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + # Makes them readable + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, } + for field in self.tbl.metadata.schema().fields + } - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } - entries.append( - { - "status": entry.status.value, - "snapshot_id": entry.snapshot_id, - "sequence_number": entry.sequence_number, - "file_sequence_number": entry.file_sequence_number, - "data_file": { - "content": entry.data_file.content, - "file_path": entry.data_file.file_path, - "file_format": entry.data_file.file_format, - "partition": partition_record_dict, - "record_count": entry.data_file.record_count, - "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes), - "value_counts": dict(entry.data_file.value_counts), - "null_value_counts": dict(entry.data_file.null_value_counts), - "nan_value_counts": dict(entry.data_file.nan_value_counts), - "lower_bounds": entry.data_file.lower_bounds, - "upper_bounds": entry.data_file.upper_bounds, - "key_metadata": entry.data_file.key_metadata, - "split_offsets": entry.data_file.split_offsets, - "equality_ids": entry.data_file.equality_ids, - "sort_order_id": entry.data_file.sort_order_id, - "spec_id": entry.data_file.spec_id, - }, - "readable_metrics": readable_metrics, - } - ) + entries.append( + { + "status": entry.status.value, + "snapshot_id": entry.snapshot_id, + "sequence_number": entry.sequence_number, + "file_sequence_number": entry.file_sequence_number, + "data_file": { + "content": entry.data_file.content, + "file_path": entry.data_file.file_path, + "file_format": entry.data_file.file_format, + "partition": partition_record_dict, + "record_count": entry.data_file.record_count, + "file_size_in_bytes": entry.data_file.file_size_in_bytes, + "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, + "value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None, + "null_value_counts": dict(entry.data_file.null_value_counts) + if entry.data_file.null_value_counts is not None + else None, + "nan_value_counts": dict(entry.data_file.nan_value_counts) + if entry.data_file.nan_value_counts is not None + else None, + "lower_bounds": entry.data_file.lower_bounds, + "upper_bounds": entry.data_file.upper_bounds, + "key_metadata": entry.data_file.key_metadata, + "split_offsets": entry.data_file.split_offsets, + "equality_ids": entry.data_file.equality_ids, + "sort_order_id": entry.data_file.sort_order_id, + "spec_id": entry.data_file.spec_id, + }, + "readable_metrics": readable_metrics, + } + ) return pa.Table.from_pylist( entries, schema=entries_schema, ) + def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table": Review Comment: what is `discard_deleted` used for? ########## pyiceberg/table/inspect.py: ########## @@ -157,74 +158,92 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return entries_schema + def _get_entries(self, manifest: ManifestFile, discard_deleted: bool = True) -> "pa.Table": + import pyarrow as pa + + schema = self.tbl.metadata.schema() + entries_schema = self._get_entries_schema() entries = [] - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io): - column_sizes = entry.data_file.column_sizes or {} - value_counts = entry.data_file.value_counts or {} - null_value_counts = entry.data_file.null_value_counts or {} - nan_value_counts = entry.data_file.nan_value_counts or {} - lower_bounds = entry.data_file.lower_bounds or {} - upper_bounds = entry.data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted): + column_sizes = entry.data_file.column_sizes or {} + value_counts = entry.data_file.value_counts or {} + null_value_counts = entry.data_file.null_value_counts or {} + nan_value_counts = entry.data_file.nan_value_counts or {} + lower_bounds = entry.data_file.lower_bounds or {} + upper_bounds = entry.data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + # Makes them readable + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, } + for field in self.tbl.metadata.schema().fields + } - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } - entries.append( - { - "status": entry.status.value, - "snapshot_id": entry.snapshot_id, - "sequence_number": entry.sequence_number, - "file_sequence_number": entry.file_sequence_number, - "data_file": { - "content": entry.data_file.content, - "file_path": entry.data_file.file_path, - "file_format": entry.data_file.file_format, - "partition": partition_record_dict, - "record_count": entry.data_file.record_count, - "file_size_in_bytes": entry.data_file.file_size_in_bytes, - "column_sizes": dict(entry.data_file.column_sizes), - "value_counts": dict(entry.data_file.value_counts), - "null_value_counts": dict(entry.data_file.null_value_counts), - "nan_value_counts": dict(entry.data_file.nan_value_counts), - "lower_bounds": entry.data_file.lower_bounds, - "upper_bounds": entry.data_file.upper_bounds, - "key_metadata": entry.data_file.key_metadata, - "split_offsets": entry.data_file.split_offsets, - "equality_ids": entry.data_file.equality_ids, - "sort_order_id": entry.data_file.sort_order_id, - "spec_id": entry.data_file.spec_id, - }, - "readable_metrics": readable_metrics, - } - ) + entries.append( + { + "status": entry.status.value, + "snapshot_id": entry.snapshot_id, + "sequence_number": entry.sequence_number, + "file_sequence_number": entry.file_sequence_number, + "data_file": { + "content": entry.data_file.content, + "file_path": entry.data_file.file_path, + "file_format": entry.data_file.file_format, + "partition": partition_record_dict, + "record_count": entry.data_file.record_count, + "file_size_in_bytes": entry.data_file.file_size_in_bytes, + "column_sizes": dict(entry.data_file.column_sizes) if entry.data_file.column_sizes is not None else None, + "value_counts": dict(entry.data_file.value_counts) if entry.data_file.value_counts is not None else None, + "null_value_counts": dict(entry.data_file.null_value_counts) + if entry.data_file.null_value_counts is not None + else None, + "nan_value_counts": dict(entry.data_file.nan_value_counts) + if entry.data_file.nan_value_counts is not None + else None, + "lower_bounds": entry.data_file.lower_bounds, + "upper_bounds": entry.data_file.upper_bounds, + "key_metadata": entry.data_file.key_metadata, + "split_offsets": entry.data_file.split_offsets, + "equality_ids": entry.data_file.equality_ids, + "sort_order_id": entry.data_file.sort_order_id, + "spec_id": entry.data_file.spec_id, + }, + "readable_metrics": readable_metrics, + } + ) return pa.Table.from_pylist( entries, schema=entries_schema, ) + def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table": Review Comment: `entries` metadata table should show both https://iceberg.apache.org/docs/nightly/spark-queries/#entries ``` To show all the table's current manifest entries for both data and delete files. ``` also `discard_deleted==True` is essentially getting just the data files, which we can do with the files metadata table ########## tests/integration/test_inspect_table.py: ########## @@ -938,3 +938,111 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_all_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + assert df.column_names == [ + "status", + "snapshot_id", + "sequence_number", + "file_sequence_number", + "data_file", + "readable_metrics", + ] + + # Make sure that they are filled properly + for int_column in ["status", "snapshot_id", "sequence_number", "file_sequence_number"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for snapshot_id in df["snapshot_id"]: + assert isinstance(snapshot_id.as_py(), int) Review Comment: this is redundant, right? already included in the above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org