Fokko commented on code in PR #1626:
URL: https://github.com/apache/iceberg-python/pull/1626#discussion_r2079109317


##########
pyiceberg/table/inspect.py:
##########
@@ -523,7 +523,67 @@ def history(self) -> "pa.Table":
 
         return pa.Table.from_pylist(history, schema=history_schema)
 
-    def _files(self, snapshot_id: Optional[int] = None, data_file_filter: 
Optional[Set[DataFileContent]] = None) -> "pa.Table":
+    def _get_files_from_manifest(
+        self, manifest_list: ManifestFile, data_file_filter: 
Optional[Set[DataFileContent]] = None
+    ) -> "pa.Table":
+        import pyarrow as pa
+
+        files: list[dict[str, Any]] = []
+        schema = self.tbl.metadata.schema()
+        io = self.tbl.io
+
+        for manifest_entry in manifest_list.fetch_manifest_entry(io):
+            data_file = manifest_entry.data_file
+            if data_file_filter and data_file.content not in data_file_filter:
+                continue
+            column_sizes = data_file.column_sizes or {}
+            value_counts = data_file.value_counts or {}
+            null_value_counts = data_file.null_value_counts or {}
+            nan_value_counts = data_file.nan_value_counts or {}
+            lower_bounds = data_file.lower_bounds or {}
+            upper_bounds = data_file.upper_bounds or {}
+            readable_metrics = {
+                schema.find_column_name(field.field_id): {
+                    "column_size": column_sizes.get(field.field_id),
+                    "value_count": value_counts.get(field.field_id),
+                    "null_value_count": null_value_counts.get(field.field_id),
+                    "nan_value_count": nan_value_counts.get(field.field_id),
+                    "lower_bound": from_bytes(field.field_type, lower_bound)
+                    if (lower_bound := lower_bounds.get(field.field_id))
+                    else None,
+                    "upper_bound": from_bytes(field.field_type, upper_bound)
+                    if (upper_bound := upper_bounds.get(field.field_id))
+                    else None,
+                }
+                for field in self.tbl.metadata.schema().fields
+            }
+            files.append(
+                {
+                    "content": data_file.content,
+                    "file_path": data_file.file_path,
+                    "file_format": data_file.file_format,
+                    "spec_id": data_file.spec_id,

Review Comment:
   In Spark we also have the partition column, I think it would be good to add 
that one here as well:
   
   
https://github.com/apache/iceberg-python/blob/9fff025cfba8ff44d8eb779c6a039ac278340c75/pyiceberg/table/inspect.py#L124-L125



##########
tests/integration/test_inspect_table.py:
##########
@@ -942,3 +949,83 @@ def test_inspect_all_manifests(spark: SparkSession, 
session_catalog: Catalog, fo
     lhs = spark.table(f"{identifier}.all_manifests").toPandas()
     rhs = df.to_pandas()
     assert_frame_equal(lhs, rhs, check_dtype=False)
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_inspect_all_files(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.table_metadata_files"
+
+    tbl = _create_table(session_catalog, identifier, 
properties={"format-version": format_version})
+
+    # append three times
+    for _ in range(3):
+        tbl.append(arrow_table_with_null)
+
+    # configure table properties
+    if format_version == 2:
+        with tbl.transaction() as txn:
+            txn.set_properties({"write.delete.mode": "merge-on-read"})
+            txn.set_properties({"write.update.mode": "merge-on-read"})
+    spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
+    tbl.refresh()
+    tbl.append(arrow_table_with_null)
+    spark.sql(f"UPDATE {identifier} SET string = 'b' WHERE int = 9")
+    spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
+    tbl.refresh()
+
+    all_files_df = tbl.inspect.all_files()
+    all_data_files_df = tbl.inspect.all_data_files()
+    all_delete_files_df = tbl.inspect.all_delete_files()
+
+    _inspect_files_asserts(all_files_df, 
spark.table(f"{identifier}.all_files"))
+    _inspect_files_asserts(all_data_files_df, 
spark.table(f"{identifier}.all_data_files"))
+    _inspect_files_asserts(all_delete_files_df, 
spark.table(f"{identifier}.all_delete_files"))
+
+
+@pytest.mark.integration
+def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: 
Catalog, arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.table_metadata_files"
+
+    tbl = _create_table(
+        session_catalog,
+        identifier,
+        properties={
+            "format-version": "3",
+            "write.delete.mode": "merge-on-read",
+            "write.update.mode": "merge-on-read",
+            "write.merge.mode": "merge-on-read",
+        },
+    )
+
+    insert_data_sql = f"""INSERT INTO {identifier} VALUES
+        (false, 'a', 'aaaaaaaaaaaaaaaaaaaaaa', 1, 1, 0.0, 0.0, 
TIMESTAMP('2023-01-01 19:25:00'), TIMESTAMP('2023-01-01 19:25:00+00:00'), 
DATE('2023-01-01'), X'01', X'00000000000000000000000000000000'),
+        (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 
NULL),
+        (true, 'z', 'zzzzzzzzzzzzzzzzzzzzzz', 9, 9, 0.9, 0.9, 
TIMESTAMP('2023-03-01 19:25:00'), TIMESTAMP('2023-03-01 19:25:00+00:00'), 
DATE('2023-03-01'), X'12', X'11111111111111111111111111111111');
+    """
+
+    spark.sql(insert_data_sql)
+    spark.sql(insert_data_sql)
+    spark.sql(f"UPDATE {identifier} SET int = 2 WHERE int = 1")
+    spark.sql(f"DELETE FROM {identifier} WHERE int = 9")
+    spark.table(identifier).show(20, False)

Review Comment:
   I think this was for testing? This will trigger a Spark action, slowing the 
tests quite a bit.



##########
pyiceberg/table/inspect.py:
##########
@@ -657,3 +671,30 @@ def all_manifests(self) -> "pa.Table":
             lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
         )
         return pa.concat_tables(manifests_by_snapshots)
+
+    def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = 
None) -> "pa.Table":
+        import pyarrow as pa
+
+        snapshots = self.tbl.snapshots()
+        if not snapshots:
+            return pa.Table.from_pylist([], schema=self._get_files_schema())
+
+        executor = ExecutorFactory.get_or_create()
+        manifest_lists = executor.map(lambda snapshot: 
snapshot.manifests(self.tbl.io), snapshots)
+
+        unique_manifests = {(manifest.manifest_path, manifest) for 
manifest_list in manifest_lists for manifest in manifest_list}
+
+        file_lists = executor.map(
+            lambda args: self._get_files_from_manifest(*args), [(manifest, 
data_file_filter) for _, manifest in unique_manifests]
+        )
+
+        return pa.concat_tables(file_lists)
+
+    def all_files(self) -> "pa.Table":
+        return self._all_files()
+
+    def all_data_files(self) -> "pa.Table":
+        return self._all_files({DataFileContent.DATA})
+
+    def all_delete_files(self) -> "pa.Table":
+        return self._all_files({DataFileContent.POSITION_DELETES, 
DataFileContent.EQUALITY_DELETES})

Review Comment:
   Yes, let's do that in a separate PR: 
https://github.com/apache/iceberg-python/issues/1982



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to