syun64 commented on code in PR #921:
URL: https://github.com/apache/iceberg-python/pull/921#discussion_r1676751929


##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2083,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:
+        lhs = table_schema.find_field(field_name)
+        rhs = task_schema.find_field(field_name)
+        # Check nullability
+        if lhs.required != rhs.required:
+            rich_table.add_row(
+                field_name,
+                "Nullability",
+                f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
+                f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
+            )
+        # Check if type is consistent
+        if any(
+            (isinstance(lhs.field_type, container_type) and 
isinstance(rhs.field_type, container_type))
+            for container_type in {StructType, MapType, ListType}
+        ):
+            continue
+        elif lhs.field_type != rhs.field_type:
+            rich_table.add_row(
+                field_name,
+                "Type",
+                f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
+                f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
+            )

Review Comment:
   Hi @HonahX - I tried this out, and I think we may benefit from scoping this 
out from this PR and investing some more time to figure out the correct way to 
support type promotions. The exception I'm getting is as follows:
   
   ```
   tests/integration/test_writes/utils.py:79: in _create_table
       tbl.append(d)
   pyiceberg/table/__init__.py:1557: in append
       tx.append(df=df, snapshot_properties=snapshot_properties)
   pyiceberg/table/__init__.py:503: in append
       for data_file in data_files:
   pyiceberg/io/pyarrow.py:2252: in _dataframe_to_data_files
       yield from write_file(
   /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:621: in 
result_iterator
       yield _result_or_cancel(fs.pop())
   /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:319: in 
_result_or_cancel
       return fut.result(timeout)
   /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:458: in 
result
       return self.__get_result()
   /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:403: in 
__get_result
       raise self._exception
   /usr/local/python/3.10.13/lib/python3.10/concurrent/futures/thread.py:58: in 
run
       result = self.fn(*self.args, **self.kwargs)
   pyiceberg/io/pyarrow.py:2030: in write_parquet
       statistics = data_file_statistics_from_parquet_metadata(
   pyiceberg/io/pyarrow.py:1963: in data_file_statistics_from_parquet_metadata
       col_aggs[field_id] = StatsAggregator(
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <pyiceberg.io.pyarrow.StatsAggregator object at 0x787561b422c0>, 
iceberg_type = LongType(), physical_type_string = 'INT32', trunc_length = None
   
       def __init__(self, iceberg_type: PrimitiveType, physical_type_string: 
str, trunc_length: Optional[int] = None) -> None:
           self.current_min = None
           self.current_max = None
           self.trunc_length = trunc_length
       
           expected_physical_type = _primitive_to_physical(iceberg_type)
           if expected_physical_type != physical_type_string:
   >           raise ValueError(
                   f"Unexpected physical type {physical_type_string} for 
{iceberg_type}, expected {expected_physical_type}"
               )
   E           ValueError: Unexpected physical type INT32 for long, expected 
INT64
   
   pyiceberg/io/pyarrow.py:1556: ValueError
   ```
   
   And this is because the `file_schema` that's passed to 
`_to_requested_schema` in `write_parquet` function is just the IcebergTable 
schema instead of being a Schema representation of the pyarrow Table's data 
type itself. So when the types of the `file_schema` and the `requested_schema` 
are compared, they are both comparing the Iceberg table type (e.g. LongType) 
instead of the smaller pyarrow type in the dataframe (e.g. IntegerType). 
   
   I think this is going to take a bit of work to ensure that we are using the 
schema that actually represents the datatype of the types within the Arrow 
dataframe, because we also have to create a Schema representation of the 
PyArrow schema that has field_ids consistent with the Iceberg Table schema, 
because the `ArrowProjectionVisitor` uses field_ids for lookup against the 
`file_schema`.
   
   I'd like to continue this discussion out of scope of this release, but I 
think we will have to decide on one of the following two approaches:
   1. We decide to write with the compatible smaller parquet types (write using 
INT32 for a LongType) and fix the 
[StatsAggregator](https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L1550)
 to handle different physical types
   2. Update the `file_schema` input to `_to_requested_schema` in 
`write_parquet` so that we [upcast the arrow data 
type](https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L2014)
 and write using the larger expected physical types into the parquet file.
   
   Long story short, our underlying piping currently doesn't yet support 
`promote` on write and there's still some work left for us in order to do so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to