Fokko commented on code in PR #921:
URL: https://github.com/apache/iceberg-python/pull/921#discussion_r1676911616


##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(

Review Comment:
   Nit: This naming is still from when we only used it at the read-path, 
probably we make it more generic. Maybe `provided_schema` and 
`requested_schema`? Open for suggestions!



##########
tests/integration/test_add_files.py:
##########
@@ -501,14 +501,11 @@ def test_add_files_fails_on_schema_mismatch(spark: 
SparkSession, session_catalog
             )
 
     expected = """Mismatch in fields:
-┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓
-┃    ┃ Table field              ┃ Dataframe field          ┃
-┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩
-│ ✅ │ 1: foo: optional boolean │ 1: foo: optional boolean │
-| ✅ │ 2: bar: optional string  │ 2: bar: optional string  │
-│ ❌ │ 3: baz: optional int     │ 3: baz: optional string  │
-│ ✅ │ 4: qux: optional date    │ 4: qux: optional date    │
-└────┴──────────────────────────┴──────────────────────────┘
+┏━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓

Review Comment:
   Is it just me, or is the left easier to read? 😅 



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(

Review Comment:
   I'm debating myself if the API is the most extensible here. I think we 
should re-use `_check_schema_compatible(requested_schema: Schema, 
provided_schema: Schema)` instead of reimplementing the logic in Arrow here. 
This nicely splits `pyarrow_to_schema` and `_check_schema_compatible`.



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2083,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:
+        lhs = table_schema.find_field(field_name)
+        rhs = task_schema.find_field(field_name)
+        # Check nullability
+        if lhs.required != rhs.required:
+            rich_table.add_row(
+                field_name,
+                "Nullability",
+                f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
+                f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
+            )
+        # Check if type is consistent
+        if any(
+            (isinstance(lhs.field_type, container_type) and 
isinstance(rhs.field_type, container_type))
+            for container_type in {StructType, MapType, ListType}
+        ):
+            continue
+        elif lhs.field_type != rhs.field_type:
+            rich_table.add_row(
+                field_name,
+                "Type",
+                f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
+                f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
+            )

Review Comment:
   I think it would be good to get this to work as well. It should be pretty 
easy by just first upcasting the buffers before writing.



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:

Review Comment:
   This is a good point, this could be solved using the 
`SchemaWithPartnerVisitor`



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1999,7 +2003,6 @@ def write_file(io: FileIO, table_metadata: TableMetadata, 
tasks: Iterator[WriteT
 
     def write_parquet(task: WriteTask) -> DataFile:
         table_schema = task.schema
-

Review Comment:
   nit: unrelated change



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2083,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:
+        lhs = table_schema.find_field(field_name)
+        rhs = task_schema.find_field(field_name)
+        # Check nullability
+        if lhs.required != rhs.required:
+            rich_table.add_row(
+                field_name,
+                "Nullability",
+                f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
+                f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
+            )
+        # Check if type is consistent
+        if any(
+            (isinstance(lhs.field_type, container_type) and 
isinstance(rhs.field_type, container_type))
+            for container_type in {StructType, MapType, ListType}

Review Comment:
   The ListType itself also has a nullability. I don't think that's covered 
here.



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1450,14 +1451,17 @@ def field_partner(self, partner_struct: 
Optional[pa.Array], field_id: int, _: st
             except ValueError:
                 return None
 
-            if isinstance(partner_struct, pa.StructArray):
-                return partner_struct.field(name)
-            elif isinstance(partner_struct, pa.Table):
-                return partner_struct.column(name).combine_chunks()
-            elif isinstance(partner_struct, pa.RecordBatch):
-                return partner_struct.column(name)
-            else:
-                raise ValueError(f"Cannot find {name} in expected 
partner_struct type {type(partner_struct)}")
+            try:
+                if isinstance(partner_struct, pa.StructArray):
+                    return partner_struct.field(name)
+                elif isinstance(partner_struct, pa.Table):
+                    return partner_struct.column(name).combine_chunks()
+                elif isinstance(partner_struct, pa.RecordBatch):
+                    return partner_struct.column(name)
+                else:
+                    raise ValueError(f"Cannot find {name} in expected 
partner_struct type {type(partner_struct)}")
+            except KeyError:

Review Comment:
   Above we have the `file_schema` that should correspond with the 
`partner_struct`. I expect that when looking up the field-id, it should already 
`return None`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to