[GitHub] [iceberg] Gschiavon commented on issue #5946: Not able to run spark procedure rewrite_data_files
Gschiavon commented on issue #5946: URL: https://github.com/apache/iceberg/issues/5946#issuecomment-1367851261 A quick solution (workaround) to this is to add the package in the `--packages` while doing spark submit Like this: `--packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.1.0` I had to add this too `--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp` in case someone runs to the same error using k8s `Exception in thread "main" java.io.FileNotFoundException: /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-785919a8-9943-4aa2-8694-d58267a93470-1.0.xml (No such file or directory)` Another workaround would be to download the iceberg package into the docker image -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6500: Aws: Cosmetic change and simplify statusCode check in GlueTableOperations
Fokko commented on code in PR #6500: URL: https://github.com/apache/iceberg/pull/6500#discussion_r1059366839 ## aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java: ## @@ -184,9 +185,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (persistFailure instanceof AwsServiceException) { int statusCode = ((AwsServiceException) persistFailure).statusCode(); -if (statusCode >= 500 && statusCode < 600) { - commitStatus = CommitStatus.FAILURE; -} else { +if (!(statusCode >= 500 && statusCode < 600)) { Review Comment: For me, this is harder to read, because of the additional negation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] cccs-eric commented on pull request #6497: Python: Move `adlfs` import inline
cccs-eric commented on PR #6497: URL: https://github.com/apache/iceberg/pull/6497#issuecomment-1367932332 @Fokko yeah, after I wrote that message, I started with a fresh venv and couldn't make it work without installing `pyiceberg[s3fs]`. I remember seeing other places (tests, docs, ...) in the project where S3 was assumed. Might be worthwhile to do a full search for `S3` and check the references. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #6504: Python: Add tests
rdblue merged PR #6504: URL: https://github.com/apache/iceberg/pull/6504 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6501: Python: Use PyArrow buffer
rdblue commented on code in PR #6501: URL: https://github.com/apache/iceberg/pull/6501#discussion_r1059492469 ## python/pyiceberg/io/pyarrow.py: ## @@ -151,7 +160,7 @@ def open(self) -> InputStream: an AWS error code 15 """ try: -input_file = self._filesystem.open_input_file(self._path) +input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size) Review Comment: I think that we need to have a flag to signal when `seek` will not be called. The protocol includes seek and we need to support it for Parquet files. However, we can hint to Arrow that we don't need to seek. How about adding `seekable=True` to `open` and allowing the caller to use this by setting `seekable=False`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #6497: Python: Move `adlfs` import inline
rdblue merged PR #6497: URL: https://github.com/apache/iceberg/pull/6497 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #6495: Python-legacy: Fix CI
rdblue commented on PR #6495: URL: https://github.com/apache/iceberg/pull/6495#issuecomment-1368051275 Looks good to me. Thanks for fixing it. When do you think it will be time to remove the legacy code? I think we're about to the point where we can do everything the legacy code can. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #6495: Python-legacy: Fix CI
rdblue merged PR #6495: URL: https://github.com/apache/iceberg/pull/6495 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6485: API: New KMS Client Interface
rdblue commented on code in PR #6485: URL: https://github.com/apache/iceberg/pull/6485#discussion_r1059493412 ## api/src/main/java/org/apache/iceberg/encryption/KmsClient.java: ## @@ -22,7 +22,8 @@ import java.nio.ByteBuffer; import java.util.Map; -/** A minimum client interface to connect to a key management service (KMS). */ +/** @deprecated A minimum client interface to connect to a key management service (KMS). */ Review Comment: This should also state when the API will be removed (in 2.0.0). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059495752 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done Review Comment: `ResolveError` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059496480 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) Review Comment: Minor: we should detect when the Iceberg schema is missing and raise a friendly exception that says the file can't be read because it doesn't have an Iceberg schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059496896 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) Review Comment: Looks like the name `bound_row_filter` is reused, which is going to cause problems because it leaks the filter for the last iteration into the current iteration. Probably best to use `bound_file_filter` or something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059497186 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) Review Comment: For cases where the file does not contain a column that is being filtered, does binding to the file schema throw an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059497682 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +465,198 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) Review Comment: Okay, so this is because of the use of `select_full_types=False`. I think that this works since it is using `schema.field_ids` but we may want to move this to a helper method eventually, rather than embedding it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] krvikash commented on a diff in pull request #6500: Aws: Cosmetic change in GlueTableOperations
krvikash commented on code in PR #6500: URL: https://github.com/apache/iceberg/pull/6500#discussion_r1059499681 ## aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java: ## @@ -184,9 +185,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (persistFailure instanceof AwsServiceException) { int statusCode = ((AwsServiceException) persistFailure).statusCode(); -if (statusCode >= 500 && statusCode < 600) { - commitStatus = CommitStatus.FAILURE; -} else { +if (!(statusCode >= 500 && statusCode < 600)) { Review Comment: Thanks, @Fokko for your review. I have reverted this change now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on pull request #6495: Python-legacy: Fix CI
Fokko commented on PR #6495: URL: https://github.com/apache/iceberg/pull/6495#issuecomment-1368078960 Thanks for merging it. It feels to me that it isn't up to me to remove the old code. Now and then I still look at the old code to get some inspiration, but I agree that most of the functionality is also in PyIceberg by now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6501: Python: Use PyArrow buffer
Fokko commented on code in PR #6501: URL: https://github.com/apache/iceberg/pull/6501#discussion_r1059512957 ## python/pyiceberg/io/pyarrow.py: ## @@ -151,7 +160,7 @@ def open(self) -> InputStream: an AWS error code 15 """ try: -input_file = self._filesystem.open_input_file(self._path) +input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size) Review Comment: That's a great point. Since it is part of the protocol, we should be able to use `seek`. I've added the `seekable` option to the `.open` operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059515855 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: +return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) + +return values + +def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: +return struct_result + +def struct( +self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] +) -> Optional[pa.Array]: +if struct_array is None: +return None +return pa.StructArray.from_arrays(arrays=field_results, fields=pa.struct(schema_to_pyarrow(struct))) + +def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: +if field_array is not None: +return self.cast_if_needed(field, field_array)
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059515919 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done Review Comment: Good one, also updated the one in `schema.py` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059515855 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: +return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) + +return values + +def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: +return struct_result + +def struct( +self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] +) -> Optional[pa.Array]: +if struct_array is None: +return None +return pa.StructArray.from_arrays(arrays=field_results, fields=pa.struct(schema_to_pyarrow(struct))) + +def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: +if field_array is not None: +return self.cast_if_needed(field, field_array)
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059516096 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: Review Comment: This needs to be updated to check whether the expected type is a primitive. Otherwise, when you have nested structs or other complex types, it fails because `promote` doesn't support them. ```python def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) if field.field_type.is_primitive and field.field_type != file_field.field_type: return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this s
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059516096 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: Review Comment: This needs to be updated to check whether the expected type is a primitive. Otherwise, when you have nested structs or other complex types, it fails because `promote` doesn't support them. ```python def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) if field.field_type.is_primitive and field.field_type != file_field.field_type: return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) ``` I caught this when testing the nested list case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-
[GitHub] [iceberg] Fokko opened a new issue, #6505: Infer Iceberg schema from the Parquet file
Fokko opened a new issue, #6505: URL: https://github.com/apache/iceberg/issues/6505 ### Feature Request / Improvement In PyIceberg we rely on fetching the schema from the Parquet metadata. If this is not available (because the parquet file is written by something else than an Iceberg writer), we want to go over the actual schema and construct the Iceberg schema from it. ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059516392 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) Review Comment: Good suggestion, the current error is really obscure. I've added a check and created a ticket https://github.com/apache/iceberg/issues/6505 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059516697 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) Review Comment: Auch, normally a type checker would also complain, but the types are the same. Thanks for catching it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059516840 ## python/pyiceberg/schema.py: ## @@ -1046,3 +1055,79 @@ def _project_map(map_type: MapType, value_result: IcebergType) -> MapType: value_type=value_result, value_required=map_type.value_required, ) + + +@singledispatch +def promote(file_type: IcebergType, read_type: IcebergType) -> IcebergType: +"""Promotes reading a file type to a read type + +Args: +file_type (IcebergType): The type of the Avro file +read_type (IcebergType): The requested read type + +Raises: +ResolveException: If attempting to resolve an unrecognized object type +""" +raise ResolveException(f"Cannot promote {file_type} to {read_type}") + + +@promote.register(IntegerType) +def _(file_type: IntegerType, read_type: IcebergType) -> IcebergType: +if isinstance(read_type, LongType): +# Ints/Longs are binary compatible in Avro, so this is okay +return read_type +else: +raise ResolveException(f"Cannot promote an int to {read_type}") + + +@promote.register(FloatType) +def _(file_type: FloatType, read_type: IcebergType) -> IcebergType: +if isinstance(read_type, DoubleType): +# A double type is wider +return read_type +else: +raise ResolveException(f"Cannot promote an float to {read_type}") + + +@promote.register(StringType) +def _(file_type: StringType, read_type: IcebergType) -> IcebergType: +if isinstance(read_type, BinaryType): +return read_type +else: +raise ResolveException(f"Cannot promote an string to {read_type}") + + +@promote.register(BinaryType) +def _(file_type: BinaryType, read_type: IcebergType) -> IcebergType: +if isinstance(read_type, StringType): +return read_type +else: +raise ResolveException(f"Cannot promote an binary to {read_type}") + + +@promote.register(DecimalType) +def _(file_type: DecimalType, read_type: IcebergType) -> IcebergType: +if isinstance(read_type, DecimalType): +if file_type.precision <= read_type.precision and file_type.scale == file_type.scale: +return read_type +else: +raise ResolveException(f"Cannot reduce precision from {file_type} to {read_type}") +else: +raise ResolveException(f"Cannot promote an decimal to {read_type}") + + +@promote.register(StructType) +def _(file_type: StructType, read_type: IcebergType) -> IcebergType: Review Comment: I don't think that we want to do this. For nested structures, visitors should handle the logic. Type promotion should just be for primitive types. I handled this in the code above by checking whether the type was primitive before calling. I think that's better than trying to add logic to implement promotion, which is not defined for structs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059517229 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) Review Comment: That's a good question. Currently it throws: ``` _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ test_pyarrow.py:740: in project return project_table( ../../pyiceberg/io/pyarrow.py:495: in project_table bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) ../../pyiceberg/expressions/visitors.py:203: in bind return visit(expression, BindVisitor(schema, case_sensitive)) /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/functools.py:877: in wrapper return dispatch(args[0].__class__)(*args, **kw) ../../pyiceberg/expressions/visitors.py:175: in _ return visitor.visit_unbound_predicate(predicate=obj) ../../pyiceberg/expressions/visitors.py:240: in visit_unbound_predicate return predicate.bind(self.schema, case_sensitive=self.case_sensitive) ../../pyiceberg/expressions/__init__.py:615: in bind bound_term = self.term.bind(schema, case_sensitive) ../../pyiceberg/expressions/__init__.py:180: in bind field = schema.find_field(name_or_id=self.name, case_sensitive=case_sensitive) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = Schema(NestedField(field_id=1, name='id', field_type=IntegerType(), required=False), NestedField(field_id=2, name='data', field_type=StringType(), required=False), schema_id=0, identifier_field_ids=[]) name_or_id = 'unknown_field', case_sensitive = True def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> NestedField: """Find a field using a field name or field ID Args: name_or_id (str | int): Either a field name or a field ID case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True. Raises: ValueError: When the value cannot be found Returns: NestedField: The matched NestedField """ if isinstance(name_or_id, int): if name_or_id not in self._lazy_id_to_field: raise ValueError(f"Could not find field with id: {name_or_id}") return self._lazy_id_to_field[name_or_id] if case_sensitive: field_id = self._name_to_id.get(name_or_id) else: field_id = self._lazy_name_to_id_lower.get(name_or_id.lower()) if field_id is None: > raise ValueError(f"Could not
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059517783 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: +return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) + +return values + +def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: +return struct_result + +def struct( +self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] +) -> Optional[pa.Array]: +if struct_array is None: +return None +return pa.StructArray.from_arrays(arrays=field_results, fields=pa.struct(schema_to_pyarrow(struct))) + +def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: +if field_array is not None: +return self.cast_if_needed(field, field_array)
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059518245 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: Review Comment: This is a part of my latest PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059518886 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: Review Comment: Ah I see, this also allows us to remove the struct from the promotion. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059519176 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +468,170 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) +}.union(extract_field_ids(bound_row_filter)) + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + +if file_schema is None: +raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( +source=[path], schema=file_project_schema_arrow, format=ds.ParquetFileFormat(), filesystem=fs +).to_table(filter=pyarrow_filter) + +tables.append(to_requested_schema(projected_schema, file_project_schema, arrow_table)) + +if len(tables) > 1: +return pa.concat_tables(tables) +else: +return tables[0] + + +def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: +struct_array = visit_with_partner( +requested_schema, table, ArrowProjectionVisitor(file_schema, len(table)), ArrowAccessor(file_schema) +) + +arrays = [] +fields = [] +for pos, field in enumerate(requested_schema.fields): +array = struct_array.field(pos) +arrays.append(array) +fields.append(pa.field(field.name, array.type, field.optional)) +return pa.Table.from_arrays(arrays, schema=pa.schema(fields)) + + +class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): +file_schema: Schema +table_length: int + +def __init__(self, file_schema: Schema, table_length: int): +self.file_schema = file_schema +self.table_length = table_length + +def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: +file_field = self.file_schema.find_field(field.field_id) +if field.field_type != file_field.field_type: +return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) + +return values + +def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: +return struct_result + +def struct( +self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] +) -> Optional[pa.Array]: +if struct_array is None: +return None +return pa.StructArray.from_arrays(arrays=field_results, fields=pa.struct(schema_to_pyarrow(struct))) + +def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: +if field_array is not None: +return self.cast_if_needed(field, field_array)
[GitHub] [iceberg] rdblue commented on a diff in pull request #6490: Python: Replace Pydantic with StructRecord
rdblue commented on code in PR #6490: URL: https://github.com/apache/iceberg/pull/6490#discussion_r1059522065 ## python/pyiceberg/manifest.py: ## @@ -76,137 +66,283 @@ def __repr__(self) -> str: return f"FileFormat.{self.name}" -class DataFile(IcebergBaseModel): -content: DataFileContent = Field(default=DataFileContent.DATA) -file_path: str = Field() -file_format: FileFormat = Field() -partition: Dict[str, Any] = Field() -record_count: int = Field() -file_size_in_bytes: int = Field() -block_size_in_bytes: Optional[int] = Field() -column_sizes: Optional[Dict[int, int]] = Field() -value_counts: Optional[Dict[int, int]] = Field() -null_value_counts: Optional[Dict[int, int]] = Field() -nan_value_counts: Optional[Dict[int, int]] = Field() -distinct_counts: Optional[Dict[int, int]] = Field() -lower_bounds: Optional[Dict[int, bytes]] = Field() -upper_bounds: Optional[Dict[int, bytes]] = Field() -key_metadata: Optional[bytes] = Field() -split_offsets: Optional[List[int]] = Field() -equality_ids: Optional[List[int]] = Field() -sort_order_id: Optional[int] = Field() - - -class ManifestEntry(IcebergBaseModel): -status: ManifestEntryStatus = Field() -snapshot_id: Optional[int] = Field() -sequence_number: Optional[int] = Field() -data_file: DataFile = Field() - - -class PartitionFieldSummary(IcebergBaseModel): -contains_null: bool = Field() -contains_nan: Optional[bool] = Field() -lower_bound: Optional[bytes] = Field() -upper_bound: Optional[bytes] = Field() - - -class ManifestFile(IcebergBaseModel): -manifest_path: str = Field() -manifest_length: int = Field() -partition_spec_id: int = Field() -content: ManifestContent = Field(default=ManifestContent.DATA) -sequence_number: int = Field(default=0) -min_sequence_number: int = Field(default=0) -added_snapshot_id: Optional[int] = Field() -added_data_files_count: Optional[int] = Field() -existing_data_files_count: Optional[int] = Field() -deleted_data_files_count: Optional[int] = Field() -added_rows_count: Optional[int] = Field() -existing_rows_counts: Optional[int] = Field() -deleted_rows_count: Optional[int] = Field() -partitions: Optional[List[PartitionFieldSummary]] = Field() -key_metadata: Optional[bytes] = Field() - -def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]: +class DataFile(Record): +@staticmethod +def from_record(record: Record, format_version: int) -> Union[DataFileV1, DataFileV2]: +if format_version == 1: +return DataFileV1(*record) +elif format_version == 2: +return DataFileV2(*record) +else: +raise ValueError(f"Unknown format-version: {format_version}") + +file_path: str +file_format: FileFormat +partition: Record +record_count: int +file_size_in_bytes: int +block_size_in_bytes: Optional[int] = None +column_sizes: Optional[Dict[int, int]] = None +value_counts: Optional[Dict[int, int]] = None +null_value_counts: Optional[Dict[int, int]] = None +nan_value_counts: Optional[Dict[int, int]] = None +distinct_counts: Optional[Dict[int, int]] = None # Does not seem to be used on the Java side!? +lower_bounds: Optional[Dict[int, bytes]] = None +upper_bounds: Optional[Dict[int, bytes]] = None +key_metadata: Optional[bytes] = None +split_offsets: Optional[List[int]] = None +equality_ids: Optional[List[int]] = None +sort_order_id: Optional[int] = None +content: DataFileContent = DataFileContent.DATA + + +class DataFileV1(DataFile): +def __setitem__(self, pos: int, value: Any) -> None: Review Comment: I think that these methods need to be dynamic based on the schema that is used to read the file. That's why we have a mapping from the canonical (full schema) positions and the positions from the expected schema: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseFile.java#L240-L242 That would also make it so you can have just one `DataFile` class because v1 is just a projection of v2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6490: Python: Replace Pydantic with StructRecord
rdblue commented on code in PR #6490: URL: https://github.com/apache/iceberg/pull/6490#discussion_r1059522065 ## python/pyiceberg/manifest.py: ## @@ -76,137 +66,283 @@ def __repr__(self) -> str: return f"FileFormat.{self.name}" -class DataFile(IcebergBaseModel): -content: DataFileContent = Field(default=DataFileContent.DATA) -file_path: str = Field() -file_format: FileFormat = Field() -partition: Dict[str, Any] = Field() -record_count: int = Field() -file_size_in_bytes: int = Field() -block_size_in_bytes: Optional[int] = Field() -column_sizes: Optional[Dict[int, int]] = Field() -value_counts: Optional[Dict[int, int]] = Field() -null_value_counts: Optional[Dict[int, int]] = Field() -nan_value_counts: Optional[Dict[int, int]] = Field() -distinct_counts: Optional[Dict[int, int]] = Field() -lower_bounds: Optional[Dict[int, bytes]] = Field() -upper_bounds: Optional[Dict[int, bytes]] = Field() -key_metadata: Optional[bytes] = Field() -split_offsets: Optional[List[int]] = Field() -equality_ids: Optional[List[int]] = Field() -sort_order_id: Optional[int] = Field() - - -class ManifestEntry(IcebergBaseModel): -status: ManifestEntryStatus = Field() -snapshot_id: Optional[int] = Field() -sequence_number: Optional[int] = Field() -data_file: DataFile = Field() - - -class PartitionFieldSummary(IcebergBaseModel): -contains_null: bool = Field() -contains_nan: Optional[bool] = Field() -lower_bound: Optional[bytes] = Field() -upper_bound: Optional[bytes] = Field() - - -class ManifestFile(IcebergBaseModel): -manifest_path: str = Field() -manifest_length: int = Field() -partition_spec_id: int = Field() -content: ManifestContent = Field(default=ManifestContent.DATA) -sequence_number: int = Field(default=0) -min_sequence_number: int = Field(default=0) -added_snapshot_id: Optional[int] = Field() -added_data_files_count: Optional[int] = Field() -existing_data_files_count: Optional[int] = Field() -deleted_data_files_count: Optional[int] = Field() -added_rows_count: Optional[int] = Field() -existing_rows_counts: Optional[int] = Field() -deleted_rows_count: Optional[int] = Field() -partitions: Optional[List[PartitionFieldSummary]] = Field() -key_metadata: Optional[bytes] = Field() - -def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]: +class DataFile(Record): +@staticmethod +def from_record(record: Record, format_version: int) -> Union[DataFileV1, DataFileV2]: +if format_version == 1: +return DataFileV1(*record) +elif format_version == 2: +return DataFileV2(*record) +else: +raise ValueError(f"Unknown format-version: {format_version}") + +file_path: str +file_format: FileFormat +partition: Record +record_count: int +file_size_in_bytes: int +block_size_in_bytes: Optional[int] = None +column_sizes: Optional[Dict[int, int]] = None +value_counts: Optional[Dict[int, int]] = None +null_value_counts: Optional[Dict[int, int]] = None +nan_value_counts: Optional[Dict[int, int]] = None +distinct_counts: Optional[Dict[int, int]] = None # Does not seem to be used on the Java side!? +lower_bounds: Optional[Dict[int, bytes]] = None +upper_bounds: Optional[Dict[int, bytes]] = None +key_metadata: Optional[bytes] = None +split_offsets: Optional[List[int]] = None +equality_ids: Optional[List[int]] = None +sort_order_id: Optional[int] = None +content: DataFileContent = DataFileContent.DATA + + +class DataFileV1(DataFile): +def __setitem__(self, pos: int, value: Any) -> None: Review Comment: I think that these methods need to be dynamic based on the schema that is used to read the file. That's why we have a mapping from the canonical (full schema) positions and the positions from the expected schema: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseFile.java#L240-L242 That would also make it so you can have just one `DataFile` class because v1 is just a projection of v2. There are a few couple of version-specific classes in Java ([V1Metadata](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/V1Metadata.java) and [V2Metadata](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/V2Metadata.java)), but those are used only for writing, not for reading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubs
[GitHub] [iceberg] Fokko commented on a diff in pull request #6437: Python: Projection by Field ID
Fokko commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1059522716 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +465,198 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def project_table( +files: Iterable[FileScanTask], table: Table, row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +"""Resolves the right columns based on the identifier + +Args: +files(Iterable[FileScanTask]): A URI or a path to a local file +table(Table): The table that's being queried +row_filter(BooleanExpression): The expression for filtering rows +projected_schema(Schema): The output schema +case_sensitive(bool): Case sensitivity when looking up column names + +Raises: +ResolveException: When an incompatible query is done +""" + +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive) + +projected_field_ids = { +id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) Review Comment: The question is then if we want to return full structs. If the requested schema is: ``` location: struct ``` Of the table schema: ``` location: struct ``` With the `select_full_types=True` it would return: ``` location: struct ``` Instead of just lat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6490: Python: Replace Pydantic with StructRecord
rdblue commented on code in PR #6490: URL: https://github.com/apache/iceberg/pull/6490#discussion_r1059524353 ## python/pyiceberg/manifest.py: ## @@ -76,137 +66,283 @@ def __repr__(self) -> str: return f"FileFormat.{self.name}" -class DataFile(IcebergBaseModel): -content: DataFileContent = Field(default=DataFileContent.DATA) -file_path: str = Field() -file_format: FileFormat = Field() -partition: Dict[str, Any] = Field() -record_count: int = Field() -file_size_in_bytes: int = Field() -block_size_in_bytes: Optional[int] = Field() -column_sizes: Optional[Dict[int, int]] = Field() -value_counts: Optional[Dict[int, int]] = Field() -null_value_counts: Optional[Dict[int, int]] = Field() -nan_value_counts: Optional[Dict[int, int]] = Field() -distinct_counts: Optional[Dict[int, int]] = Field() -lower_bounds: Optional[Dict[int, bytes]] = Field() -upper_bounds: Optional[Dict[int, bytes]] = Field() -key_metadata: Optional[bytes] = Field() -split_offsets: Optional[List[int]] = Field() -equality_ids: Optional[List[int]] = Field() -sort_order_id: Optional[int] = Field() - - -class ManifestEntry(IcebergBaseModel): -status: ManifestEntryStatus = Field() -snapshot_id: Optional[int] = Field() -sequence_number: Optional[int] = Field() -data_file: DataFile = Field() - - -class PartitionFieldSummary(IcebergBaseModel): -contains_null: bool = Field() -contains_nan: Optional[bool] = Field() -lower_bound: Optional[bytes] = Field() -upper_bound: Optional[bytes] = Field() - - -class ManifestFile(IcebergBaseModel): -manifest_path: str = Field() -manifest_length: int = Field() -partition_spec_id: int = Field() -content: ManifestContent = Field(default=ManifestContent.DATA) -sequence_number: int = Field(default=0) -min_sequence_number: int = Field(default=0) -added_snapshot_id: Optional[int] = Field() -added_data_files_count: Optional[int] = Field() -existing_data_files_count: Optional[int] = Field() -deleted_data_files_count: Optional[int] = Field() -added_rows_count: Optional[int] = Field() -existing_rows_counts: Optional[int] = Field() -deleted_rows_count: Optional[int] = Field() -partitions: Optional[List[PartitionFieldSummary]] = Field() -key_metadata: Optional[bytes] = Field() - -def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]: +class DataFile(Record): +@staticmethod +def from_record(record: Record, format_version: int) -> Union[DataFileV1, DataFileV2]: +if format_version == 1: +return DataFileV1(*record) +elif format_version == 2: +return DataFileV2(*record) +else: +raise ValueError(f"Unknown format-version: {format_version}") + +file_path: str +file_format: FileFormat +partition: Record +record_count: int +file_size_in_bytes: int +block_size_in_bytes: Optional[int] = None +column_sizes: Optional[Dict[int, int]] = None +value_counts: Optional[Dict[int, int]] = None +null_value_counts: Optional[Dict[int, int]] = None +nan_value_counts: Optional[Dict[int, int]] = None +distinct_counts: Optional[Dict[int, int]] = None # Does not seem to be used on the Java side!? +lower_bounds: Optional[Dict[int, bytes]] = None +upper_bounds: Optional[Dict[int, bytes]] = None +key_metadata: Optional[bytes] = None +split_offsets: Optional[List[int]] = None +equality_ids: Optional[List[int]] = None +sort_order_id: Optional[int] = None +content: DataFileContent = DataFileContent.DATA + + +class DataFileV1(DataFile): +def __setitem__(self, pos: int, value: Any) -> None: +if pos == 0: +self.file_path = value +elif pos == 1: +self.file_format = value +elif pos == 2: +self.partition = value +elif pos == 3: +self.record_count = value +elif pos == 4: +self.file_size_in_bytes = value +elif pos == 5: +self.block_size_in_bytes = value +elif pos == 6: +self.column_sizes = value +elif pos == 7: +self.value_counts = value +elif pos == 8: +self.null_value_counts = value +elif pos == 9: +self.nan_value_counts = value +elif pos == 10: +self.lower_bounds = value +elif pos == 11: +self.upper_bounds = value +elif pos == 12: +self.key_metadata = value +elif pos == 13: +self.split_offsets = value +elif pos == 14: +self.sort_order_id = value + + +class DataFileV2(DataFile): +def __setitem__(self, pos: int, value: Any) -> None: +if pos == 0: +self.content = value +elif pos == 1: +self.file_path = value +elif pos == 2: +self.file_format = value +elif
[GitHub] [iceberg] rdblue commented on a diff in pull request #6490: Python: Replace Pydantic with StructRecord
rdblue commented on code in PR #6490: URL: https://github.com/apache/iceberg/pull/6490#discussion_r1059524606 ## python/pyiceberg/typedef.py: ## @@ -85,16 +86,24 @@ class Record(StructProtocol): def __init__(self, *data: Union[Any, StructProtocol]) -> None: self._data = list(data) +for idx, value in enumerate(data): +self[idx] = value -def set(self, pos: int, value: Any) -> None: +def __setitem__(self, pos: int, value: Any) -> None: Review Comment: I like allowing the use of `__setitem__` and `__getitem__` but do we want to make a breaking change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6490: Python: Replace Pydantic with StructRecord
rdblue commented on code in PR #6490: URL: https://github.com/apache/iceberg/pull/6490#discussion_r1059524781 ## python/tests/expressions/test_visitors.py: ## @@ -827,85 +840,91 @@ def manifest_no_stats() -> ManifestFile: return _to_manifest_file() +def _PartitionFieldSummary( +contains_null: bool, contains_nan: Optional[bool], lower_bound: Optional[bytes], upper_bound: Optional[bytes] +) -> PartitionFieldSummary: +return PartitionFieldSummary.from_record(Record(contains_null, contains_nan, lower_bound, upper_bound)) + + @pytest.fixture def manifest() -> ManifestFile: return _to_manifest_file( # id -PartitionFieldSummary( +_PartitionFieldSummary( Review Comment: Is it worth adding the `_` prefix? For records from the spec, I'd probably omit it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko merged pull request #6437: Python: Projection by Field ID
Fokko merged PR #6437: URL: https://github.com/apache/iceberg/pull/6437 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on pull request #6437: Python: Projection by Field ID
Fokko commented on PR #6437: URL: https://github.com/apache/iceberg/pull/6437#issuecomment-1368105150 Thanks for the review @rdblue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5183: Allow to configure Avro block size
github-actions[bot] commented on issue #5183: URL: https://github.com/apache/iceberg/issues/5183#issuecomment-1368130469 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns
github-actions[bot] commented on issue #5000: URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1368130488 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue opened a new pull request, #6506: Python: Refactor Avro read path to use a partner visitor
rdblue opened a new pull request, #6506: URL: https://github.com/apache/iceberg/pull/6506 This refactors the Avro read path to use the newly introduced `SchemaWithPartnerVisitor`. The purpose of this is to make the resolver a little more standard and make it easy to inject types that implement `StructProtocol` for the readers to use. Since the readers are constructed in several places (that all create a new `ConstructReader`) this is refactor is needed. I didn't quite get finished with this work. The next step is to remove `ConstructReader` and move all reader construction into `SchemaResolver`, then to pass a map of struct constructors into the resolver. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6506: Python: Refactor Avro read path to use a partner visitor
rdblue commented on code in PR #6506: URL: https://github.com/apache/iceberg/pull/6506#discussion_r1059559551 ## python/pyiceberg/avro/resolver.py: ## @@ -57,80 +61,91 @@ def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, Raises: NotImplementedError: If attempting to resolve an unrecognized object type """ -raise NotImplementedError(f"Cannot resolve non-type: {file_schema}") +return visit_with_partner(file_schema, read_schema, SchemaResolver(), SchemaPartnerAccessor()) + + +class SchemaResolver(SchemaWithPartnerVisitor[IcebergType, Reader]): +def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader: +return result + +def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader: +if expected_struct and not isinstance(expected_struct, StructType): +raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}") + +results: List[Tuple[Optional[int], Reader]] = [] +expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)} + +# first, add readers for the file fields that must be in order +for field, result_reader in zip(struct.fields, field_readers): +read_pos = expected_positions.get(field.field_id) +results.append((read_pos, result_reader)) + +file_fields = {field.field_id: field for field in struct.fields} +for pos, read_field in enumerate(expected_struct.fields): +if read_field.field_id not in file_fields: +if read_field.required: +raise ResolveError(f"{read_field} is non-optional, and not part of the file schema") +# Just set the new field to None +results.append((pos, NoneReader())) + +return StructReader(tuple(results)) + +def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader: +return field_reader + +def list(self, list_type: ListType, expected_list: Optional[IcebergType], element_reader: Reader) -> Reader: +if expected_list and not isinstance(expected_list, ListType): +raise ResolveError(f"File/read schema are not aligned for list, got {expected_list}") +return ListReader(element_reader) -@resolve.register(Schema) -def _(file_schema: Schema, read_schema: Schema) -> Reader: -"""Visit a Schema and starts resolving it by converting it to a struct""" -return resolve(file_schema.as_struct(), read_schema.as_struct()) +def map(self, map_type: MapType, expected_map: Optional[IcebergType], key_reader: Reader, value_reader: Reader) -> Reader: +if expected_map and not isinstance(expected_map, MapType): +raise ResolveError(f"File/read schema are not aligned for map, got {expected_map}") +return MapReader(key_reader, value_reader) -@resolve.register(StructType) -def _(file_struct: StructType, read_struct: IcebergType) -> Reader: -"""Iterates over the file schema, and checks if the field is in the read schema""" +def primitive(self, primitive: PrimitiveType, expected_primitive: Optional[IcebergType]) -> Reader: +if expected_primitive is not None: +if not isinstance(expected_primitive, PrimitiveType): +raise ResolveError(f"File/read schema are not aligned for {primitive}, got {expected_primitive}") -if not isinstance(read_struct, StructType): -raise ResolveError(f"File/read schema are not aligned for {file_struct}, got {read_struct}") +# ensure that the type can be projected to the expected +if primitive != expected_primitive: +promote(primitive, expected_primitive) -results: List[Tuple[Optional[int], Reader]] = [] -read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)} +return visit(primitive, ConstructReader()) -for file_field in file_struct.fields: -if file_field.field_id in read_fields: -read_pos, read_field = read_fields[file_field.field_id] -result_reader = resolve(file_field.field_type, read_field.field_type) + +class SchemaPartnerAccessor(PartnerAccessor[IcebergType]): +def schema_partner(self, partner: Optional[IcebergType]) -> Optional[IcebergType]: +if isinstance(partner, Schema): +return partner.as_struct() + +raise ResolveError(f"File/read schema are not aligned for schema, got {partner}") + +def field_partner(self, partner: Optional[IcebergType], field_id: int, field_name: str) -> Optional[IcebergType]: +if isinstance(partner, StructType): +field = partner.field(field_id) else: -read_pos = None -result_reader = visit(file_f