syun64 commented on code in PR #921: URL: https://github.com/apache/iceberg-python/pull/921#discussion_r1676751929
########## pyiceberg/io/pyarrow.py: ########## @@ -2079,36 +2083,63 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down Raises: ValueError: If the schemas are not compatible. """ - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema( - other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us - ) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable + task_schema = assign_fresh_schema_ids( + _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + ) - console = Console(record=True) + extra_fields = task_schema.field_names - table_schema.field_names + missing_fields = table_schema.field_names - task_schema.field_names + fields_in_both = task_schema.field_names.intersection(table_schema.field_names) + + from rich.console import Console + from rich.table import Table as RichTable + + console = Console(record=True) + + rich_table = RichTable(show_header=True, header_style="bold") + rich_table.add_column("Field Name") + rich_table.add_column("Category") + rich_table.add_column("Table field") + rich_table.add_column("Dataframe field") + + def print_nullability(required: bool) -> str: + return "required" if required else "optional" + + for field_name in fields_in_both: + lhs = table_schema.find_field(field_name) + rhs = task_schema.find_field(field_name) + # Check nullability + if lhs.required != rhs.required: + rich_table.add_row( + field_name, + "Nullability", + f"{print_nullability(lhs.required)} {str(lhs.field_type)}", + f"{print_nullability(rhs.required)} {str(rhs.field_type)}", + ) + # Check if type is consistent + if any( + (isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) + for container_type in {StructType, MapType, ListType} + ): + continue + elif lhs.field_type != rhs.field_type: + rich_table.add_row( + field_name, + "Type", + f"{print_nullability(lhs.required)} {str(lhs.field_type)}", + f"{print_nullability(rhs.required)} {str(rhs.field_type)}", + ) Review Comment: Hi @HonahX - I tried this out, and I think we may benefit from scoping this out from this PR and investing some more time to figure out the correct way to support type promotions on write. The exception I'm getting is as follows: ``` tests/integration/test_writes/utils.py:79: in _create_table tbl.append(d) pyiceberg/table/__init__.py:1557: in append tx.append(df=df, snapshot_properties=snapshot_properties) pyiceberg/table/__init__.py:503: in append for data_file in data_files: pyiceberg/io/pyarrow.py:2252: in _dataframe_to_data_files yield from write_file( /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:621: in result_iterator yield _result_or_cancel(fs.pop()) /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:319: in _result_or_cancel return fut.result(timeout) /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:458: in result return self.__get_result() /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:403: in __get_result raise self._exception /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/thread.py:58: in run result = self.fn(*self.args, **self.kwargs) pyiceberg/io/pyarrow.py:2030: in write_parquet statistics = data_file_statistics_from_parquet_metadata( pyiceberg/io/pyarrow.py:1963: in data_file_statistics_from_parquet_metadata col_aggs[field_id] = StatsAggregator( _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <pyiceberg.io.pyarrow.StatsAggregator object at 0x787561b422c0>, iceberg_type = LongType(), physical_type_string = 'INT32', trunc_length = None def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None: self.current_min = None self.current_max = None self.trunc_length = trunc_length expected_physical_type = _primitive_to_physical(iceberg_type) if expected_physical_type != physical_type_string: > raise ValueError( f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}" ) E ValueError: Unexpected physical type INT32 for long, expected INT64 pyiceberg/io/pyarrow.py:1556: ValueError ``` And this is because the `file_schema` that's passed to `_to_requested_schema` in `write_parquet` function is just the IcebergTable schema instead of being a Schema representation of the pyarrow Table's data type itself. So when the types of the `file_schema` and the `requested_schema` are compared, they are both comparing the Iceberg table type (e.g. LongType) instead of the smaller pyarrow type in the dataframe (e.g. IntegerType). I think this is going to take a bit of work to ensure that we are using the schema that actually represents the datatype of the types within the Arrow dataframe, because we also have to create a Schema representation of the PyArrow schema that has field_ids consistent with the Iceberg Table schema, because the `ArrowProjectionVisitor` uses field_ids for lookup against the `file_schema`. I'd like to continue this discussion out of scope of this release, but I think we will have to decide on one of the following two approaches: 1. We decide to write with the compatible smaller parquet types (write using INT32 for a LongType) and fix the [StatsAggregator](https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L1550) to handle different physical types 2. Update the `file_schema` input to `_to_requested_schema` in `write_parquet` so that we [upcast the arrow data type](https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L2014) and write using the larger expected physical types into the parquet file. Long story short, our underlying piping currently doesn't yet support `promote` on write and there's still some work left for us in order to do so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org