rkuhlercadent commented on issue #1401:
URL: 
https://github.com/apache/iceberg-python/issues/1401#issuecomment-2523813801

   Here is a python script that will demonstrate the issue.
   
   ```
   import os
   import datetime
   
   from pyiceberg.catalog.sql import SqlCatalog
   from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, 
compute_statistics_plan
   from pyiceberg.io.pyarrow import parquet_path_to_id_mapping
   from pyiceberg.schema import Schema
   from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
   from pyiceberg.table import TableProperties
   from pyiceberg.typedef import Record
   from pyiceberg.types import StringType, IntegerType, NestedField
   from pyiceberg.partitioning import PartitionSpec, PartitionField
   from pyiceberg.transforms import IdentityTransform
   from pyiceberg.table.name_mapping import create_mapping_from_schema
   import pyarrow as pa
   import pyarrow.parquet as pq
   
   
   def demonstrate_identity_partition_scan_issue():
       # we have petabytes of parquet data in hive format on s3 already that we 
are cataloging in iceberg format.
       # note that these parquet files do NOT have the partition columns in 
them which is standard for hive format.
       # the partition values must be taken from the iceberg metadata for the 
identity partition columns as
       # specified in the iceberg spec: 
https://iceberg.apache.org/spec/#column-projection
       # "Values for field ids which are not present in a data file must be 
resolved according the following rules:
       # Return the value from partition metadata if an Identity Transform 
exists for the field and the partition
       # value is present in the partition struct on data_file object in the 
manifest. This allows for metadata
       # only migrations of Hive tables."
       warehouse_path = os.path.dirname(os.path.realpath(__file__))
       namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE"
       table_name = "IDENTITY_PARTITION_SCAN_ISSUE"
       catalog = get_iceberg_catalog(warehouse_path)
       drop_catalog_entities_for_test(catalog, namespace_name)
       # create sample hive files
       catalog.create_namespace(namespace_name)
       sample_hive_parquet_file = 
create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 
202412)
       # catalog existing hive data in iceberg
       table = create_iceberg_table(catalog, namespace_name, table_name)
       add_data_file(table, sample_hive_parquet_file, 
table.metadata.default_spec_id)
       # the partition_id columns should have values from the metadata not null 
in this output
       # this same iceberg metadata correctly returns the partition_id column 
values in spark, athena, and snowflake
       print(table.scan().to_arrow())
   
   
   def get_iceberg_catalog(warehouse_path):
       # using sqlite catalog on local filesystem for demo
       catalog = SqlCatalog(
           "default",
           **{
               "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
               "warehouse": f"file://{warehouse_path}",
           })
       return catalog
   
   
   def drop_catalog_entities_for_test(catalog, namespace_name):
       if namespace_name in [n[0] for n in catalog.list_namespaces()]:
           for _, table_name in catalog.list_tables(namespace_name):
               catalog.drop_table(f"{namespace_name}.{table_name}")
           catalog.drop_namespace(namespace_name)
   
   
   def create_sample_hive_parquet_file(warehouse_path, namespace_name, 
table_name, partition_id):
       location = 
f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet"
       os.makedirs(os.path.dirname(location), exist_ok=True)
       name = datetime.datetime.strptime(str(partition_id), 
"%Y%m").strftime("%B %Y")
       names = pa.array([name], type=pa.string())
       pq.write_table(pa.table([names], names=["name"]), location)
       return {
           "location": location,
           "file_size": os.path.getsize(location),
           "partition_id": partition_id
       }
   
   
   def create_iceberg_table(catalog, namespace_name, table_name):
       print("creating iceberg table")
       schema = Schema(
           NestedField(field_id=1, name="partition_id", 
field_type=IntegerType(), required=False),
           NestedField(field_id=2, name="name", field_type=StringType(), 
required=False))
       partition_spec = PartitionSpec(
           PartitionField(source_id=1, field_id=1000, 
transform=IdentityTransform(), name="partition_id"))
       table = catalog.create_table(
           f"{namespace_name}.{table_name}",
           schema=schema,
           partition_spec=partition_spec,
           properties={TableProperties.DEFAULT_NAME_MAPPING: 
create_mapping_from_schema(schema).model_dump_json()})
       return table
   
   
   def add_data_file(table, hive_data_file, spec_id):
       print("adding data file")
       parquet_metadata = pq.read_metadata(hive_data_file.get("location"))
       stats_columns = compute_statistics_plan(table.schema(), 
table.metadata.properties)
       statistics = data_file_statistics_from_parquet_metadata(
           parquet_metadata=parquet_metadata,
           stats_columns=stats_columns,
           parquet_column_mapping=parquet_path_to_id_mapping(table.schema()))
       data_file = DataFile(
           content=DataFileContent.DATA,
           file_path=hive_data_file.get("location"),
           file_format=FileFormat.PARQUET,
           partition=Record(partition_id=hive_data_file.get("partition_id")),
           file_size_in_bytes=hive_data_file.get("file_size"),
           sort_order_id=None,
           spec_id=spec_id,
           equality_ids=None,
           key_metadata=None,
           **statistics.to_serialized_dict())
       with table.transaction() as tx:
           with tx.update_snapshot().overwrite() as update_snapshot:
               update_snapshot.append_data_file(data_file)
   
   
   if __name__ == "__main__":
       demonstrate_identity_partition_scan_issue()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to