kevinjqliu commented on code in PR #921:
URL: https://github.com/apache/iceberg-python/pull/921#discussion_r1676880923


##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:

Review Comment:
   just want to check my understanding, this works for nested fields because 
nested fields are "flattened" by `. field_names` and then fetched by 
`.find_field`.
   
   For example: a `df` schema like
   ```
   task_schema = pa.field(
       "person",
       pa.struct([
           pa.field("name", pa.string(), nullable=True),
       ]),
       nullable=True,
   )
   ```
   
   `task_schema.field_names` will produce `{"person", "person.name"}`. 
   `task_schema.find_field("person")` and 
`task_schema.find_field("person.name")` will fetch the corresponding fields



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:
+        lhs = table_schema.find_field(field_name)
+        rhs = task_schema.find_field(field_name)
+        # Check nullability
+        if lhs.required != rhs.required:

Review Comment:
   to make this a bit more complicated... 
   an optional field can be written to by a required field. the schema is 
"widened" 
   a required field cannot be written to by an optional field. cannot deal with 
null value



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, 
other_schema: pa.Schema, down
     Raises:
         ValueError: If the schemas are not compatible.
     """
-    name_mapping = table_schema.name_mapping
-    try:
-        task_schema = pyarrow_to_schema(
-            other_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
-        )
-    except ValueError as e:
-        other_schema = _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
-        additional_names = set(other_schema.column_names) - 
set(table_schema.column_names)
-        raise ValueError(
-            f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
-        ) from e
-
-    if table_schema.as_struct() != task_schema.as_struct():
-        from rich.console import Console
-        from rich.table import Table as RichTable
+    task_schema = assign_fresh_schema_ids(
+        _pyarrow_to_schema_without_ids(other_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    )
 
-        console = Console(record=True)
+    extra_fields = task_schema.field_names - table_schema.field_names
+    missing_fields = table_schema.field_names - task_schema.field_names
+    fields_in_both = 
task_schema.field_names.intersection(table_schema.field_names)
+
+    from rich.console import Console
+    from rich.table import Table as RichTable
+
+    console = Console(record=True)
+
+    rich_table = RichTable(show_header=True, header_style="bold")
+    rich_table.add_column("Field Name")
+    rich_table.add_column("Category")
+    rich_table.add_column("Table field")
+    rich_table.add_column("Dataframe field")
+
+    def print_nullability(required: bool) -> str:
+        return "required" if required else "optional"
+
+    for field_name in fields_in_both:
+        lhs = table_schema.find_field(field_name)
+        rhs = task_schema.find_field(field_name)
+        # Check nullability
+        if lhs.required != rhs.required:

Review Comment:
   similar to below with type promotion, we can deal with this later! 



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1450,14 +1451,17 @@ def field_partner(self, partner_struct: 
Optional[pa.Array], field_id: int, _: st
             except ValueError:
                 return None
 
-            if isinstance(partner_struct, pa.StructArray):
-                return partner_struct.field(name)
-            elif isinstance(partner_struct, pa.Table):
-                return partner_struct.column(name).combine_chunks()
-            elif isinstance(partner_struct, pa.RecordBatch):
-                return partner_struct.column(name)
-            else:
-                raise ValueError(f"Cannot find {name} in expected 
partner_struct type {type(partner_struct)}")
+            try:
+                if isinstance(partner_struct, pa.StructArray):
+                    return partner_struct.field(name)
+                elif isinstance(partner_struct, pa.Table):
+                    return partner_struct.column(name).combine_chunks()
+                elif isinstance(partner_struct, pa.RecordBatch):
+                    return partner_struct.column(name)
+                else:
+                    raise ValueError(f"Cannot find {name} in expected 
partner_struct type {type(partner_struct)}")
+            except KeyError:

Review Comment:
   Is this change responsible for schema projection / writing a subset of the 
schema? Do you mind expanding on the mechanism behind how this works? I'm 
curious



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to