kevinjqliu commented on issue #1401: URL: https://github.com/apache/iceberg-python/issues/1401#issuecomment-2523929896
Thanks for providing the test! I added a few print statements ``` import os import datetime from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata, compute_statistics_plan from pyiceberg.io.pyarrow import parquet_path_to_id_mapping from pyiceberg.schema import Schema from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.table import TableProperties from pyiceberg.typedef import Record from pyiceberg.types import StringType, IntegerType, NestedField from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import IdentityTransform from pyiceberg.table.name_mapping import create_mapping_from_schema import pyarrow as pa import pyarrow.parquet as pq def demonstrate_identity_partition_scan_issue(): # we have petabytes of parquet data in hive format on s3 already that we are cataloging in iceberg format. # note that these parquet files do NOT have the partition columns in them which is standard for hive format. # the partition values must be taken from the iceberg metadata for the identity partition columns as # specified in the iceberg spec: https://iceberg.apache.org/spec/#column-projection # "Values for field ids which are not present in a data file must be resolved according the following rules: # Return the value from partition metadata if an Identity Transform exists for the field and the partition # value is present in the partition struct on data_file object in the manifest. This allows for metadata # only migrations of Hive tables." warehouse_path = os.path.dirname(os.path.realpath(__file__)) namespace_name = "IDENTITY_PARTITION_SCAN_ISSUE_NAMESPACE" table_name = "IDENTITY_PARTITION_SCAN_ISSUE" catalog = get_iceberg_catalog(warehouse_path) drop_catalog_entities_for_test(catalog, namespace_name) # create sample hive files sample_hive_parquet_file = create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, 202412) # catalog existing hive data in iceberg catalog.create_namespace(namespace_name) table = create_iceberg_table(catalog, namespace_name, table_name) print("Hive parquet data:\n", pq.read_table(sample_hive_parquet_file.get("location"))) print() add_data_file(table, sample_hive_parquet_file, table.metadata.default_spec_id) # the partition_id columns should have values from the metadata not null in this output # this same iceberg metadata correctly returns the partition_id column values in spark, athena, and snowflake print("Table partitions:\n", table.inspect.partitions().to_pandas()) print() print("Table scan:\n", table.scan().to_arrow()) print() def get_iceberg_catalog(warehouse_path): # using sqlite catalog on local filesystem for demo catalog = SqlCatalog( "default", **{ "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", "warehouse": f"file://{warehouse_path}", }) return catalog def drop_catalog_entities_for_test(catalog, namespace_name): if namespace_name in [n[0] for n in catalog.list_namespaces()]: for _, table_name in catalog.list_tables(namespace_name): catalog.drop_table(f"{namespace_name}.{table_name}") catalog.drop_namespace(namespace_name) def create_sample_hive_parquet_file(warehouse_path, namespace_name, table_name, partition_id): location = f"{warehouse_path}/{namespace_name}.db/{table_name}/data/partition_id={partition_id}/data.parquet" os.makedirs(os.path.dirname(location), exist_ok=True) name = datetime.datetime.strptime(str(partition_id), "%Y%m").strftime("%B %Y") names = pa.array([name], type=pa.string()) pq.write_table(pa.table([names], names=["name"]), location) return { "location": location, "file_size": os.path.getsize(location), "partition_id": partition_id } def create_iceberg_table(catalog, namespace_name, table_name): print("creating iceberg table") schema = Schema( NestedField(field_id=1, name="partition_id", field_type=IntegerType(), required=False), NestedField(field_id=2, name="name", field_type=StringType(), required=False)) partition_spec = PartitionSpec( PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_id")) table = catalog.create_table( f"{namespace_name}.{table_name}", schema=schema, partition_spec=partition_spec, properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}) return table def add_data_file(table, hive_data_file, spec_id): print("adding data file") parquet_metadata = pq.read_metadata(hive_data_file.get("location")) stats_columns = compute_statistics_plan(table.schema(), table.metadata.properties) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, stats_columns=stats_columns, parquet_column_mapping=parquet_path_to_id_mapping(table.schema())) data_file = DataFile( content=DataFileContent.DATA, file_path=hive_data_file.get("location"), file_format=FileFormat.PARQUET, partition=Record(partition_id=hive_data_file.get("partition_id")), file_size_in_bytes=hive_data_file.get("file_size"), sort_order_id=None, spec_id=spec_id, equality_ids=None, key_metadata=None, **statistics.to_serialized_dict()) with table.transaction() as tx: with tx.update_snapshot().overwrite() as update_snapshot: update_snapshot.append_data_file(data_file) if __name__ == "__main__": demonstrate_identity_partition_scan_issue() ``` And heres the output ``` creating iceberg table Hive parquet data: pyarrow.Table name: string ---- name: [["December 2024"]] adding data file Table partitions: partition spec_id record_count ... equality_delete_file_count last_updated_at last_updated_snapshot_id 0 {'partition_id': 202412} 0 1 ... 0 2024-12-06 18:37:00.672 4125530822203270775 [1 rows x 11 columns] Table scan: pyarrow.Table partition_id: int32 name: large_string ---- partition_id: [[null]] name: [["December 2024"]] ``` The issue is the IdentityTransform partition column `partition_id` is present in the metadata, but not in the table scan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org