Fokko commented on code in PR #921: URL: https://github.com/apache/iceberg-python/pull/921#discussion_r1676911616
########## pyiceberg/io/pyarrow.py: ########## @@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down Raises: ValueError: If the schemas are not compatible. """ - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema( - other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us - ) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable + task_schema = assign_fresh_schema_ids( Review Comment: Nit: This naming is still from when we only used it at the read-path, probably we make it more generic. Maybe `provided_schema` and `requested_schema`? Open for suggestions! ########## tests/integration/test_add_files.py: ########## @@ -501,14 +501,11 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog ) expected = """Mismatch in fields: -┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ -┃ ┃ Table field ┃ Dataframe field ┃ -┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ -│ ✅ │ 1: foo: optional boolean │ 1: foo: optional boolean │ -| ✅ │ 2: bar: optional string │ 2: bar: optional string │ -│ ❌ │ 3: baz: optional int │ 3: baz: optional string │ -│ ✅ │ 4: qux: optional date │ 4: qux: optional date │ -└────┴──────────────────────────┴──────────────────────────┘ +┏━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓ Review Comment: Is it just me, or is the left easier to read? 😅 ########## pyiceberg/io/pyarrow.py: ########## @@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down Raises: ValueError: If the schemas are not compatible. """ - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema( - other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us - ) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable + task_schema = assign_fresh_schema_ids( Review Comment: I'm debating myself if the API is the most extensible here. I think we should re-use `_check_schema_compatible(requested_schema: Schema, provided_schema: Schema)` instead of reimplementing the logic in Arrow here. This nicely splits `pyarrow_to_schema` and `_check_schema_compatible`. ########## pyiceberg/io/pyarrow.py: ########## @@ -2079,36 +2083,63 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down Raises: ValueError: If the schemas are not compatible. """ - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema( - other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us - ) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable + task_schema = assign_fresh_schema_ids( + _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + ) - console = Console(record=True) + extra_fields = task_schema.field_names - table_schema.field_names + missing_fields = table_schema.field_names - task_schema.field_names + fields_in_both = task_schema.field_names.intersection(table_schema.field_names) + + from rich.console import Console + from rich.table import Table as RichTable + + console = Console(record=True) + + rich_table = RichTable(show_header=True, header_style="bold") + rich_table.add_column("Field Name") + rich_table.add_column("Category") + rich_table.add_column("Table field") + rich_table.add_column("Dataframe field") + + def print_nullability(required: bool) -> str: + return "required" if required else "optional" + + for field_name in fields_in_both: + lhs = table_schema.find_field(field_name) + rhs = task_schema.find_field(field_name) + # Check nullability + if lhs.required != rhs.required: + rich_table.add_row( + field_name, + "Nullability", + f"{print_nullability(lhs.required)} {str(lhs.field_type)}", + f"{print_nullability(rhs.required)} {str(rhs.field_type)}", + ) + # Check if type is consistent + if any( + (isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) + for container_type in {StructType, MapType, ListType} + ): + continue + elif lhs.field_type != rhs.field_type: + rich_table.add_row( + field_name, + "Type", + f"{print_nullability(lhs.required)} {str(lhs.field_type)}", + f"{print_nullability(rhs.required)} {str(rhs.field_type)}", + ) Review Comment: I think it would be good to get this to work as well. It should be pretty easy by just first upcasting the buffers before writing. ########## pyiceberg/io/pyarrow.py: ########## @@ -2079,36 +2082,63 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down Raises: ValueError: If the schemas are not compatible. """ - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema( - other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us - ) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable + task_schema = assign_fresh_schema_ids( + _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + ) - console = Console(record=True) + extra_fields = task_schema.field_names - table_schema.field_names + missing_fields = table_schema.field_names - task_schema.field_names + fields_in_both = task_schema.field_names.intersection(table_schema.field_names) + + from rich.console import Console + from rich.table import Table as RichTable + + console = Console(record=True) + + rich_table = RichTable(show_header=True, header_style="bold") + rich_table.add_column("Field Name") + rich_table.add_column("Category") + rich_table.add_column("Table field") + rich_table.add_column("Dataframe field") + + def print_nullability(required: bool) -> str: + return "required" if required else "optional" + + for field_name in fields_in_both: Review Comment: This is a good point, this could be solved using the `SchemaWithPartnerVisitor` ########## pyiceberg/io/pyarrow.py: ########## @@ -1999,7 +2003,6 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT def write_parquet(task: WriteTask) -> DataFile: table_schema = task.schema - Review Comment: nit: unrelated change ########## pyiceberg/io/pyarrow.py: ########## @@ -2079,36 +2083,63 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down Raises: ValueError: If the schemas are not compatible. """ - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema( - other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us - ) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable + task_schema = assign_fresh_schema_ids( + _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) + ) - console = Console(record=True) + extra_fields = task_schema.field_names - table_schema.field_names + missing_fields = table_schema.field_names - task_schema.field_names + fields_in_both = task_schema.field_names.intersection(table_schema.field_names) + + from rich.console import Console + from rich.table import Table as RichTable + + console = Console(record=True) + + rich_table = RichTable(show_header=True, header_style="bold") + rich_table.add_column("Field Name") + rich_table.add_column("Category") + rich_table.add_column("Table field") + rich_table.add_column("Dataframe field") + + def print_nullability(required: bool) -> str: + return "required" if required else "optional" + + for field_name in fields_in_both: + lhs = table_schema.find_field(field_name) + rhs = task_schema.find_field(field_name) + # Check nullability + if lhs.required != rhs.required: + rich_table.add_row( + field_name, + "Nullability", + f"{print_nullability(lhs.required)} {str(lhs.field_type)}", + f"{print_nullability(rhs.required)} {str(rhs.field_type)}", + ) + # Check if type is consistent + if any( + (isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) + for container_type in {StructType, MapType, ListType} Review Comment: The ListType itself also has a nullability. I don't think that's covered here. ########## pyiceberg/io/pyarrow.py: ########## @@ -1450,14 +1451,17 @@ def field_partner(self, partner_struct: Optional[pa.Array], field_id: int, _: st except ValueError: return None - if isinstance(partner_struct, pa.StructArray): - return partner_struct.field(name) - elif isinstance(partner_struct, pa.Table): - return partner_struct.column(name).combine_chunks() - elif isinstance(partner_struct, pa.RecordBatch): - return partner_struct.column(name) - else: - raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}") + try: + if isinstance(partner_struct, pa.StructArray): + return partner_struct.field(name) + elif isinstance(partner_struct, pa.Table): + return partner_struct.column(name).combine_chunks() + elif isinstance(partner_struct, pa.RecordBatch): + return partner_struct.column(name) + else: + raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}") + except KeyError: Review Comment: Above we have the `file_schema` that should correspond with the `partner_struct`. I expect that when looking up the field-id, it should already `return None`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org