rkuhlercadent commented on issue #1401: URL: https://github.com/apache/iceberg-python/issues/1401#issuecomment-2523813801
Here is a python script that will demonstrate the issue. ``` import os import datetime from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, compute_statistics_plan from pyiceberg.io.pyarrow import parquet_path_to_id_mapping from pyiceberg.schema import Schema from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.table import TableProperties from pyiceberg.typedef import Record from pyiceberg.types import StringType, IntegerType, NestedField from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import IdentityTransform from pyiceberg.table.name_mapping import create_mapping_from_schema import pyarrow as pa import pyarrow.parquet as pq def demonstrate_identity_partition_scan_issue(): # we have petabytes of parquet data in hive format on s3 already that we are cataloging in iceberg format. # note that these parquet files do NOT have the partition columns in them which is standard for hive format. # the partition values must be taken from the iceberg metadata for the identity partition columns as # specified in the iceberg spec: https://iceberg.apache.org/spec/#column-projection # "Values for field ids which are not present in a data file must be resolved according the following rules: # Return the value from partition metadata if an Identity Transform exists for the field and the partition # value is present in the partition struct on data_file object in the manifest. This allows for metadata # only migrations of Hive tables." warehouse_path = os.path.dirname(os.path.realpath(__file__)) namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE" table_name = "IDENTITY_PARTITION_SCAN_ISSUE" catalog = get_iceberg_catalog(warehouse_path) drop_catalog_entities_for_test(catalog, namespace_name) # create sample hive files catalog.create_namespace(namespace_name) sample_hive_parquet_file = create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 202412) # catalog existing hive data in iceberg table = create_iceberg_table(catalog, namespace_name, table_name) add_data_file(table, sample_hive_parquet_file, table.metadata.default_spec_id) # the partition_id columns should have values from the metadata not null in this output # this same iceberg metadata correctly returns the partition_id column values in spark, athena, and snowflake print(table.scan().to_arrow()) def get_iceberg_catalog(warehouse_path): # using sqlite catalog on local filesystem for demo catalog = SqlCatalog( "default", **{ "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", "warehouse": f"file://{warehouse_path}", }) return catalog def drop_catalog_entities_for_test(catalog, namespace_name): if namespace_name in [n[0] for n in catalog.list_namespaces()]: for _, table_name in catalog.list_tables(namespace_name): catalog.drop_table(f"{namespace_name}.{table_name}") catalog.drop_namespace(namespace_name) def create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, partition_id): location = f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet" os.makedirs(os.path.dirname(location), exist_ok=True) name = datetime.datetime.strptime(str(partition_id), "%Y%m").strftime("%B %Y") names = pa.array([name], type=pa.string()) pq.write_table(pa.table([names], names=["name"]), location) return { "location": location, "file_size": os.path.getsize(location), "partition_id": partition_id } def create_iceberg_table(catalog, namespace_name, table_name): print("creating iceberg table") schema = Schema( NestedField(field_id=1, name="partition_id", field_type=IntegerType(), required=False), NestedField(field_id=2, name="name", field_type=StringType(), required=False)) partition_spec = PartitionSpec( PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_id")) table = catalog.create_table( f"{namespace_name}.{table_name}", schema=schema, partition_spec=partition_spec, properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}) return table def add_data_file(table, hive_data_file, spec_id): print("adding data file") parquet_metadata = pq.read_metadata(hive_data_file.get("location")) stats_columns = compute_statistics_plan(table.schema(), table.metadata.properties) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, stats_columns=stats_columns, parquet_column_mapping=parquet_path_to_id_mapping(table.schema())) data_file = DataFile( content=DataFileContent.DATA, file_path=hive_data_file.get("location"), file_format=FileFormat.PARQUET, partition=Record(partition_id=hive_data_file.get("partition_id")), file_size_in_bytes=hive_data_file.get("file_size"), sort_order_id=None, spec_id=spec_id, equality_ids=None, key_metadata=None, **statistics.to_serialized_dict()) with table.transaction() as tx: with tx.update_snapshot().overwrite() as update_snapshot: update_snapshot.append_data_file(data_file) if __name__ == "__main__": demonstrate_identity_partition_scan_issue() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org