kevinjqliu commented on issue #1401:
URL: 
https://github.com/apache/iceberg-python/issues/1401#issuecomment-2523929896

   Thanks for providing the test! I added a few print statements
   ```
   import os
   import datetime
   
   from pyiceberg.catalog.sql import SqlCatalog
   from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, 
compute_statistics_plan
   from pyiceberg.io.pyarrow import parquet_path_to_id_mapping
   from pyiceberg.schema import Schema
   from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
   from pyiceberg.table import TableProperties
   from pyiceberg.typedef import Record
   from pyiceberg.types import StringType, IntegerType, NestedField
   from pyiceberg.partitioning import PartitionSpec, PartitionField
   from pyiceberg.transforms import IdentityTransform
   from pyiceberg.table.name_mapping import create_mapping_from_schema
   import pyarrow as pa
   import pyarrow.parquet as pq
   
   
   def demonstrate_identity_partition_scan_issue():
       # we have petabytes of parquet data in hive format on s3 already that we 
are cataloging in iceberg format.
       # note that these parquet files do NOT have the partition columns in 
them which is standard for hive format.
       # the partition values must be taken from the iceberg metadata for the 
identity partition columns as
       # specified in the iceberg spec: 
https://iceberg.apache.org/spec/#column-projection
       # "Values for field ids which are not present in a data file must be 
resolved according the following rules:
       # Return the value from partition metadata if an Identity Transform 
exists for the field and the partition
       # value is present in the partition struct on data_file object in the 
manifest. This allows for metadata
       # only migrations of Hive tables."
       warehouse_path = os.path.dirname(os.path.realpath(__file__))
       namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE"
       table_name = "IDENTITY_PARTITION_SCAN_ISSUE"
       catalog = get_iceberg_catalog(warehouse_path)
       drop_catalog_entities_for_test(catalog, namespace_name)
       # create sample hive files
       sample_hive_parquet_file = 
create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 
202412)
       # catalog existing hive data in iceberg
       catalog.create_namespace(namespace_name)
       table = create_iceberg_table(catalog, namespace_name, table_name)
   
       print("Hive parquet data:\n", 
pq.read_table(sample_hive_parquet_file.get("location")))
       print()
       add_data_file(table, sample_hive_parquet_file, 
table.metadata.default_spec_id)
       # the partition_id columns should have values from the metadata not null 
in this output
       # this same iceberg metadata correctly returns the partition_id column 
values in spark, athena, and snowflake
       print("Table partitions:\n", table.inspect.partitions().to_pandas())
       print()
   
       print("Table scan:\n", table.scan().to_arrow())
       print()
   
   
   def get_iceberg_catalog(warehouse_path):
       # using sqlite catalog on local filesystem for demo
       catalog = SqlCatalog(
           "default",
           **{
               "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
               "warehouse": f"file://{warehouse_path}",
           })
       return catalog
   
   
   def drop_catalog_entities_for_test(catalog, namespace_name):
       if namespace_name in [n[0] for n in catalog.list_namespaces()]:
           for _, table_name in catalog.list_tables(namespace_name):
               catalog.drop_table(f"{namespace_name}.{table_name}")
           catalog.drop_namespace(namespace_name)
   
   
   def create_sample_hive_parquet_file(warehouse_path, namespace_name, 
table_name, partition_id):
       location = 
f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet"
       os.makedirs(os.path.dirname(location), exist_ok=True)
       name = datetime.datetime.strptime(str(partition_id), 
"%Y%m").strftime("%B %Y")
       names = pa.array([name], type=pa.string())
       pq.write_table(pa.table([names], names=["name"]), location)
       return {
           "location": location,
           "file_size": os.path.getsize(location),
           "partition_id": partition_id
       }
   
   
   def create_iceberg_table(catalog, namespace_name, table_name):
       print("creating iceberg table")
       schema = Schema(
           NestedField(field_id=1, name="partition_id", 
field_type=IntegerType(), required=False),
           NestedField(field_id=2, name="name", field_type=StringType(), 
required=False))
       partition_spec = PartitionSpec(
           PartitionField(source_id=1, field_id=1000, 
transform=IdentityTransform(), name="partition_id"))
       table = catalog.create_table(
           f"{namespace_name}.{table_name}",
           schema=schema,
           partition_spec=partition_spec,
           properties={TableProperties.DEFAULT_NAME_MAPPING: 
create_mapping_from_schema(schema).model_dump_json()})
       return table
   
   
   def add_data_file(table, hive_data_file, spec_id):
       print("adding data file")
       parquet_metadata = pq.read_metadata(hive_data_file.get("location"))
       stats_columns = compute_statistics_plan(table.schema(), 
table.metadata.properties)
       statistics = data_file_statistics_from_parquet_metadata(
           parquet_metadata=parquet_metadata,
           stats_columns=stats_columns,
           parquet_column_mapping=parquet_path_to_id_mapping(table.schema()))
       data_file = DataFile(
           content=DataFileContent.DATA,
           file_path=hive_data_file.get("location"),
           file_format=FileFormat.PARQUET,
           partition=Record(partition_id=hive_data_file.get("partition_id")),
           file_size_in_bytes=hive_data_file.get("file_size"),
           sort_order_id=None,
           spec_id=spec_id,
           equality_ids=None,
           key_metadata=None,
           **statistics.to_serialized_dict())
       with table.transaction() as tx:
           with tx.update_snapshot().overwrite() as update_snapshot:
               update_snapshot.append_data_file(data_file)
   
   
   if __name__ == "__main__":
       demonstrate_identity_partition_scan_issue()
   ```
   
   And heres the output
   ```
   creating iceberg table
   Hive parquet data:
    pyarrow.Table
   name: string
   ----
   name: [["December 2024"]]
   
   adding data file
   Table partitions:
                      partition  spec_id  record_count  ...  
equality_delete_file_count         last_updated_at  last_updated_snapshot_id
   0  {'partition_id': 202412}        0             1  ...                      
     0 2024-12-06 18:37:00.672       4125530822203270775
   
   [1 rows x 11 columns]
   
   Table scan:
    pyarrow.Table
   partition_id: int32
   name: large_string
   ----
   partition_id: [[null]]
   name: [["December 2024"]]
   ```
   
   The issue is the IdentityTransform partition column `partition_id` is 
present in the metadata, but not in the table scan. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to