[GitHub] [iceberg] deadwind4 opened a new issue, #6429: [Feature Proposal] Log Store in Iceberg
deadwind4 opened a new issue, #6429: URL: https://github.com/apache/iceberg/issues/6429 ### Feature Request / Improvement This proposal aims to improve Iceberg's capability in real-time via importing a log store system. Streaming read data that are cached in a log store(Kafka). Proposal document: https://docs.google.com/document/d/1Zy4bbvSuN9ax7Tq4TuxL2vi4xQ4DR4wbR4QK5m69qQo/edit?usp=sharing Code demo: https://github.com/deadwind4/iceberg/tree/log-store ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new issue, #6430: Python: Support for static table
Fokko opened a new issue, #6430: URL: https://github.com/apache/iceberg/issues/6430 ### Feature Request / Improvement In Java we have the StaticTableOperations: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/StaticTableOperations.java That allows the user to directly load a table from a metadata location. This static table does not support any modifications to the table. I think this would be valuable for PyIceberg as well, as this will enable users to easily pull in data, or share datasets. ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #3220: [Python] support iceberg hadoop catalog in python library
Fokko commented on issue #3220: URL: https://github.com/apache/iceberg/issues/3220#issuecomment-1352731893 Hey everyone, I think we should split out the idea of implementing the full hadoop catalog, and just being able to read a table from a metadata URL. For the latter, I've created a new issue: https://github.com/apache/iceberg/issues/6430 My apologies for invoking the negative feelings around the HadoopCatalog, and I fully agree that this is something we should avoid in PyIceberg. However, I do see a lot of value in just reading (not writing!) and just using a path to the metadata. @srilman I've recently started working on [integration tests](https://github.com/apache/iceberg/pull/6398) that write tables using Spark (since PyIceberg doesn't have any read support), and these are shared using the RestCatalog. Would love to get your thoughts on that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #6397: Python Instructions currently do not work for testing
Fokko commented on issue #6397: URL: https://github.com/apache/iceberg/issues/6397#issuecomment-1352734967 @rubenvdg What do you think of removing the dataclass from the `AvroStruct`. We should be able to create a Struct without including it in the PyIceberg class hierarchy. The `AvroStruct` is currently frozen as well, but that doesn't make much sense since we have a `set` method as well 🤔 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] zhongyujiang opened a new pull request, #6431: Parquet: Fix ParquetDictionaryRowGroupFilter evaluating NaN.
zhongyujiang opened a new pull request, #6431: URL: https://github.com/apache/iceberg/pull/6431 This PR fixs ParquetDictionaryRowGroupFilter evaluating `notNaN`. Because Parquet dictionaries cannot contain null values, ParquetDictionaryRowGroupFilter should check if there is null values in the column chunk when evaluting `notNaN` and return `true` if yes. This also improves looking up `NaN` value in the dictionary set, in Java, both Double.NaN and Float.NaN are considered to be equal to themselves, so: - the dict must contain values that are not `NaN` when its' size is greater than 1, can directly return `true` in this case; - can use `Set#contains` to look up `NaN` directly instead of comparing elements in the collection one by one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] zhongyujiang commented on pull request #6431: Parquet: Fix ParquetDictionaryRowGroupFilter evaluating NaN.
zhongyujiang commented on PR #6431: URL: https://github.com/apache/iceberg/pull/6431#issuecomment-1352852748 @yyanyy @rdblue Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rbalamohan opened a new pull request, #6432: Consider moving to ParallelIterable in Deletes::toPositionIndex
rbalamohan opened a new pull request, #6432: URL: https://github.com/apache/iceberg/pull/6432 Issue: https://github.com/apache/iceberg/issues/6387 When tables are updated in "merge-on-read" mode, it creates positional delete files. Performance of reads degrades quite a bit, even with 4+ positional delete files (I tried with tpcds queries). Depending on workload, data file may have to read multiple "positional delete" files to construct delete positions. This does not sound expensive, but when large number of medium sized files are present in a partition, combinedfiletask ends up with many files. So a task has to process the data files in sequential fashion and every data file reads multiple positional delete file causing slowness. PR uses "ParallelIterable" in "Deletes::toPositionIndex". RoaringBitMap isn't threadsafe and hence added sync in BitmapPositionDeleteIndex. Tried out in local cluster and confirmed that this is not expensive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jaceklaskowski opened a new pull request, #6433: Docs: README
jaceklaskowski opened a new pull request, #6433: URL: https://github.com/apache/iceberg/pull/6433 Found some very minor "issues" while reading README.md and couldn't resist fixing them all up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] cccs-eric commented on a diff in pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
cccs-eric commented on code in PR #6392: URL: https://github.com/apache/iceberg/pull/6392#discussion_r1049565481 ## python/Makefile: ## @@ -26,14 +26,21 @@ lint: poetry run pre-commit run --all-files test: - poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3" ${PYTEST_ARGS} + poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3 and not adlfs" ${PYTEST_ARGS} Review Comment: Yeah, that sounds good. I'll leave my code as-is and if my PR goes in before yours, you'll have to modify this line as a conflict resolution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] grbinho commented on pull request #6223: AWS: Use provided glue catalog id in defaultWarehouseLocation
grbinho commented on PR #6223: URL: https://github.com/apache/iceberg/pull/6223#issuecomment-1352975547 @JonasJ-ap @ajantha-bhat Hi guys, can you advise how we can move this MR forward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] djouallah commented on issue #3220: [Python] support iceberg hadoop catalog in python library
djouallah commented on issue #3220: URL: https://github.com/apache/iceberg/issues/3220#issuecomment-1353002821 for people that uses only Python ( no spark, nor glue and all this big engine stuff), is there any simple to use catalog, I read about this REST thing, but it seems it is only a spec, not an actual piece of software ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] djouallah commented on issue #6430: Python: Support for static table
djouallah commented on issue #6430: URL: https://github.com/apache/iceberg/issues/6430#issuecomment-1353005103 FWIW, BigQuery has a *read only* implementation that uses the metadata location. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
Fokko commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353167480 @cccs-eric it looks like we have to add `pyparsing` to the `pyproject.toml`. I don't know why it was working before, but it should have been added in https://github.com/apache/iceberg/pull/6259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] joshuarobinson opened a new issue, #6434: PyIceberg support for UUID types
joshuarobinson opened a new issue, #6434: URL: https://github.com/apache/iceberg/issues/6434 ### Feature Request / Improvement Currently, pyiceberg 0.2.0 fails on creating a table scan for any table (that I have at least) with UUID columns. The root problem seems to be (thanks @Fokko for explanation) that UUID is encoded as a string in avro but as a fixed-width type in Iceberg. The failure looks like this: ``` ValueError: Unknown logical/physical type combination: {'type': 'fixed', 'name': 'uuid_fixed', 'size': 16, 'logicalType': 'uuid'} ``` The full stacktrace: ``` Traceback (most recent call last): File "/read.py", line 16, in print(tbl.scan(selected_fields=("path","device")).to_arrow().to_pandas().head()) ^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 350, in to_arrow for task in self.plan_files(): File "/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 335, in plan_files yield from (FileScanTask(file) for file in matching_partition_files) File "/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 335, in yield from (FileScanTask(file) for file in matching_partition_files) ^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 149, in return (entry.data_file for entry in live_entries(input_file)) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 145, in return (entry for entry in read_manifest_entry(input_file) if entry.status != ManifestEntryStatus.DELETED) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 137, in read_manifest_entry with AvroFile(input_file) as reader: File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", line 136, in __enter__ self.schema = self.header.get_schema() File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", line 85, in get_schema return AvroSchemaConversion().avro_to_iceberg(avro_schema) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 119, in avro_to_iceberg return Schema(*[self._convert_field(field) for field in avro_schema["fields"]], schema_id=1) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 119, in return Schema(*[self._convert_field(field) for field in avro_schema["fields"]], schema_id=1) ^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 227, in _convert_field field_type=self._convert_schema(plain_type), File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 196, in _convert_schema return self._convert_record_type(avro_type) File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 284, in _convert_record_type return StructType(*[self._convert_field(field) for field in record_type["fields"]]) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 284, in return StructType(*[self._convert_field(field) for field in record_type["fields"]]) ^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 227, in _convert_field field_type=self._convert_schema(plain_type), File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 196, in _convert_schema return self._convert_record_type(avro_type) File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 284, in _convert_record_type return StructType(*[self._convert_field(field) for field in record_type["fields"]]) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", line 284, in retu
[GitHub] [iceberg] Fokko commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
Fokko commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353168942 @cccs-eric I forgot one thing, could you also add the `adlfs` option to the docs in `python/mkdocs/`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] joshuarobinson opened a new issue, #6435: PyIceberg: Avro decode EOF error
joshuarobinson opened a new issue, #6435: URL: https://github.com/apache/iceberg/issues/6435 ### Feature Request / Improvement In reading manifests for a table for a table scan in PyIceberg 0.2.0, I get an EOFError. Table was originally written in June 2022 with the most recent version of Spark/Iceberg at that time. (3.2.3 and 0.14.1 I think) Full tracebook of the error: ``` Traceback (most recent call last): File "/read.py", line 16, in print(tbl.scan(selected_fields=("time", "icao24","call_sign","origin_country")).to_arrow().to_pandas().head()) File "/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 350, in to_arrow for task in self.plan_files(): File "/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 335, in plan_files yield from (FileScanTask(file) for file in matching_partition_files) File "/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 335, in yield from (FileScanTask(file) for file in matching_partition_files) ^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 149, in return (entry.data_file for entry in live_entries(input_file)) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 145, in return (entry for entry in read_manifest_entry(input_file) if entry.status != ManifestEntryStatus.DELETED) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 139, in read_manifest_entry for record in reader: File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", line 178, in __next__ return self.__next__() ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", line 170, in __next__ return next(self.block) File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", line 106, in __next__ return self.reader.read(self.block_decoder) File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", line 283, in read result[pos] = field.read(decoder) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", line 283, in read result[pos] = field.read(decoder) ^^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", line 267, in read return self.option.read(decoder) ^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", line 329, in read read_items[key] = self.value.read(decoder) File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", line 234, in read return decoder.read_bytes() File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/decoder.py", line 122, in read_bytes return self.read(self.read_int()) ^^ File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/decoder.py", line 58, in read raise EOFError EOFError ``` The table can be read successfully with Trino. for interest, the schema of my table is ``` table { 1: time: optional long 2: icao24: optional string 3: callsign: optional string 4: origin_country: optional string 5: time_position: optional double 6: last_contact: optional long 7: longitude: optional double 8: latitude: optional double 9: baro_altitude: optional double 10: on_ground: optional boolean 11: velocity: optional double 12: true_track: optional double 13: vertical_rate: optional double 14: geo_altitude: optional double 15: squawk: optional string 16: spi: optional boolean 17: position_source: optional long } ``` ### Query engine None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@ice
[GitHub] [iceberg] joshuarobinson commented on issue #6435: PyIceberg: Avro decode EOF error
joshuarobinson commented on issue #6435: URL: https://github.com/apache/iceberg/issues/6435#issuecomment-1353182160 The table in question has only one snapshot. I'm attaching the json and avro metadata files for this table. [iceberg_6435_metadata.zip](https://github.com/apache/iceberg/files/10237824/iceberg_6435_metadata.zip) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
Fokko commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353199075 @cccs-eric Yes, please do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra closed pull request #6436: Core: Add flag to control sending metric reports via REST
nastra closed pull request #6436: Core: Add flag to control sending metric reports via REST URL: https://github.com/apache/iceberg/pull/6436 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu merged pull request #6412: Doc: Modify some options refer to Read-options in flink streaming rea…
stevenzwu merged PR #6412: URL: https://github.com/apache/iceberg/pull/6412 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on pull request #6412: Doc: Modify some options refer to Read-options in flink streaming rea…
stevenzwu commented on PR #6412: URL: https://github.com/apache/iceberg/pull/6412#issuecomment-1353410391 thx @xwmr-max for the update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on a diff in pull request #6399: API: Add strict metadata cleanup to SnapshotProducer
nastra commented on code in PR #6399: URL: https://github.com/apache/iceberg/pull/6399#discussion_r1049921925 ## core/src/main/java/org/apache/iceberg/SnapshotProducer.java: ## @@ -396,7 +399,9 @@ public void commit() { } catch (CommitStateUnknownException commitStateUnknownException) { throw commitStateUnknownException; } catch (RuntimeException e) { - Exceptions.suppressAndThrow(e, this::cleanAll); + if (!strictCleanup || e instanceof CleanableFailure) { +Exceptions.suppressAndThrow(e, this::cleanAll); Review Comment: just curious, would we need check for `strictCleanup` in https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L453-L463 as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #6437: Python: Projection by Field ID
Fokko opened a new pull request, #6437: URL: https://github.com/apache/iceberg/pull/6437 instead of name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] nastra commented on a diff in pull request #6399: API: Add strict metadata cleanup to SnapshotProducer
nastra commented on code in PR #6399: URL: https://github.com/apache/iceberg/pull/6399#discussion_r1049921925 ## core/src/main/java/org/apache/iceberg/SnapshotProducer.java: ## @@ -396,7 +399,9 @@ public void commit() { } catch (CommitStateUnknownException commitStateUnknownException) { throw commitStateUnknownException; } catch (RuntimeException e) { - Exceptions.suppressAndThrow(e, this::cleanAll); + if (!strictCleanup || e instanceof CleanableFailure) { +Exceptions.suppressAndThrow(e, this::cleanAll); Review Comment: just curious, would we need to check for `strictCleanup` in https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L453-L463 as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on pull request #6419: Doc:Example of correcting the document add/drop partition truncate
szehon-ho commented on PR #6419: URL: https://github.com/apache/iceberg/pull/6419#issuecomment-1353527618 I may be missing something but don't both truncate(data, 4) and truncate(4,data) do the same thing ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on pull request #6427: Spark 3.2: Time range query of changelog tables
flyrain commented on PR #6427: URL: https://github.com/apache/iceberg/pull/6427#issuecomment-1353556854 Thanks @szehon-ho. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain merged pull request #6427: Spark 3.2: Time range query of changelog tables
flyrain merged PR #6427: URL: https://github.com/apache/iceberg/pull/6427 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] e-gat commented on issue #6421: Running rewriteDataFiles on multiple executors in Spark
e-gat commented on issue #6421: URL: https://github.com/apache/iceberg/issues/6421#issuecomment-1353571205 After investigation we found that the latest iceberg versions support running the rewriteDataFiles across multiple executors in spark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] e-gat closed issue #6421: Running rewriteDataFiles on multiple executors in Spark
e-gat closed issue #6421: Running rewriteDataFiles on multiple executors in Spark URL: https://github.com/apache/iceberg/issues/6421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
flyrain commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050040299 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; Review Comment: Was trying to make it more readable by return 2 nulls. Will make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@ice
[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
flyrain commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050041610 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; +} else { + deletedRow.update(changeTypeIndex, UPDATE_BEFORE); + insertedRow.update(changeTypeIndex, UPDATE_AFTER); + + return new Row[] { +RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) Review Comment: `GenericInternalRow` is not a subclass of `Row`, we need to return a `Row` instance here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queri
[GitHub] [iceberg] bondarenko commented on issue #6257: Partitions metadata table shows old partitions
bondarenko commented on issue #6257: URL: https://github.com/apache/iceberg/issues/6257#issuecomment-1353617197 Looks like without `USING iceberg` you don't create iceberg table and so it doesn't have to have even update support not speaking about partitions table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #6438: Python: Reduce the use of mock objects
Fokko opened a new pull request, #6438: URL: https://github.com/apache/iceberg/pull/6438 We use mocks extensively in our Python code, this was before we had certain functionality available, such as a working FileIO. Instead of using the mocks, we can also use the actual code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #6439: Python: Add pyparsing
Fokko opened a new pull request, #6439: URL: https://github.com/apache/iceberg/pull/6439 This one was missing and was being pulled in transitively I presume -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] islamismailov commented on pull request #6353: Make sure S3 stream opened by ReadConf ctor is closed
islamismailov commented on PR #6353: URL: https://github.com/apache/iceberg/pull/6353#issuecomment-1353743389 i will try to update this PR with the feedback provided -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] cccs-eric commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
cccs-eric commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353755124 > @cccs-eric it looks like we have to add `pyparsing` to the `pyproject.toml`. I don't know why it was working before, but it should have been added in #6259 @Fokko I don't understand what is going on. `pyparsing` is part of `pyproject.toml` and when I run `make test-s3`, *it works on my machine* ™️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6365: Core: Add position deletes metadata table
RussellSpitzer commented on code in PR #6365: URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050207289 ## core/src/main/java/org/apache/iceberg/AbstractTableScan.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.expressions.ExpressionUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.DefaultMetricsContext; +import org.apache.iceberg.metrics.ImmutableScanReport; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.Timer; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTableScan> Review Comment: Probably need a doc on this explaining why this class exists -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6365: Core: Add position deletes metadata table
RussellSpitzer commented on code in PR #6365: URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050213598 ## core/src/main/java/org/apache/iceberg/AbstractTableScan.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.expressions.ExpressionUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.DefaultMetricsContext; +import org.apache.iceberg.metrics.ImmutableScanReport; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.Timer; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTableScan> +extends BaseScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class); + private ScanMetrics scanMetrics; + + protected AbstractTableScan( + TableOperations ops, Table table, Schema schema, TableScanContext context) { +super(ops, table, schema, context); + } + + protected Long snapshotId() { +return context().snapshotId(); + } + + protected Map options() { +return context().options(); + } + + protected abstract CloseableIterable doPlanFiles(); + + protected ScanMetrics scanMetrics() { +if (scanMetrics == null) { + this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext()); +} + +return scanMetrics; + } + + @Override + public Table table() { Review Comment: we are overriding this to make it public? Is this something other callers need? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #6425: Core: Unify fromJson(String) parsing
rdblue commented on PR #6425: URL: https://github.com/apache/iceberg/pull/6425#issuecomment-1353818545 Thanks, @nastra! Good to have these cleaned up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #6425: Core: Unify fromJson(String) parsing
rdblue merged PR #6425: URL: https://github.com/apache/iceberg/pull/6425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6365: Core: Add position deletes metadata table
szehon-ho commented on code in PR #6365: URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050217764 ## core/src/main/java/org/apache/iceberg/AbstractTableScan.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.expressions.ExpressionUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.DefaultMetricsContext; +import org.apache.iceberg.metrics.ImmutableScanReport; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.Timer; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTableScan> +extends BaseScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class); + private ScanMetrics scanMetrics; + + protected AbstractTableScan( + TableOperations ops, Table table, Schema schema, TableScanContext context) { +super(ops, table, schema, context); + } + + protected Long snapshotId() { Review Comment: Note: this is all just refactored from "BaseTableScan". So that PositionDeleteTableScan can share the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6365: Core: Add position deletes metadata table
RussellSpitzer commented on code in PR #6365: URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050219983 ## core/src/main/java/org/apache/iceberg/BaseMetadataTable.java: ## @@ -64,9 +64,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) { */ static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { PartitionSpec.Builder identitySpecBuilder = -PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); +PartitionSpec.builderFor(metadataTableSchema) +.withSpecId(spec.specId()) +.checkConflicts(false); for (PartitionField field : spec.fields()) { - identitySpecBuilder.add(field.fieldId(), field.name(), Transforms.identity()); + identitySpecBuilder.add( + field.fieldId(), field.fieldId(), field.name(), Transforms.identity()); Review Comment: Shouldn't this be field.sourceId(), field.fieldId() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on issue #6415: Vectorized Read Issue
rdblue commented on issue #6415: URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1353829220 I agree with the analysis that the problem is that we are returning dictionary-encoded Arrow vectors. Maybe we're not doing that the right way. I'll take a look at #3024. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] sfc-gh-mparmar commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables
sfc-gh-mparmar commented on code in PR #6428: URL: https://github.com/apache/iceberg/pull/6428#discussion_r1050236173 ## snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java: ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnowflakeCatalogTest { Review Comment: For the first phase, we are supporting a small subset (read-only) of capabilities for the catalog which doesn't include the ability to create/modify namespaces, tables etc. so we can't repurpose CatalogTests in this case yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] closed issue #4931: Rewrite Data Files with Manual Sort Order should also use Table Partitioning in Sort Order
github-actions[bot] closed issue #4931: Rewrite Data Files with Manual Sort Order should also use Table Partitioning in Sort Order URL: https://github.com/apache/iceberg/issues/4931 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #4931: Rewrite Data Files with Manual Sort Order should also use Table Partitioning in Sort Order
github-actions[bot] commented on issue #4931: URL: https://github.com/apache/iceberg/issues/4931#issuecomment-1353917180 This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dennishuo commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables
dennishuo commented on code in PR #6428: URL: https://github.com/apache/iceberg/pull/6428#discussion_r1050257006 ## snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java: ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnowflakeCatalogTest { Review Comment: Good suggestion! I recall skimming over it when looking at the JdbcCatalog's unittest, but saw most of the test cases were for (currently) unsupported functionality. Just for completeness though I went ahead and plugged it in (which helped identify the need to specify `test { useJUnitPlatform() }` in build.gradle to successfully inherit the tests) and verified that they all fail due to depending on at least `buildTable/createNamespace` to set up test data. One possibility if we want to split out read-only variations even for temporary scenarios, would be to abstract out the mutations into helper methods in the `CatalogTests` base class, and only test cases that explicitly test mutations would call the catalog directly, letting read-only stuff do the mutations in their fake in-memory data model. It seems like the `supports*` test methods are also pointing towards possibly wanting to extract a more formalized way to define differences in supported functionality between catalogs, perhaps more fine-grained than what things like e.g. `SupportsNamespaces` java interfaces provide. Is there any work in progress to do something for Catalog interfaces analogous to the [HCFS efforts for HadoopFileSystem](https://cwiki.apache.org/confluence/display/HADOOP2/HCFS) ? Seems like CatalogTests may have already been written with the analogous [AbstractFSContractTestBase](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java) in mind, where we could move towards analogous ["contract" definitions](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/LocalFSContract.java) and [contract config specs](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dennishuo commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables
dennishuo commented on code in PR #6428: URL: https://github.com/apache/iceberg/pull/6428#discussion_r1050257006 ## snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java: ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnowflakeCatalogTest { Review Comment: Good suggestion! I recall skimming over it when looking at the JdbcCatalog's unittest, but saw most of the test cases were for (currently) unsupported functionality. Just for completeness though I went ahead and plugged it in (which helped identify the need to specify `test { useJUnitPlatform() }` in build.gradle to successfully inherit the tests) and verified that they all fail due to depending on at least `buildTable/createNamespace` to set up test data, as @sfc-gh-mparmar mentioned. One possibility if we want to split out read-only variations even for temporary scenarios, would be to abstract out the mutations into helper methods in the `CatalogTests` base class, and only test cases that explicitly test mutations would call the catalog directly, letting read-only stuff do the mutations in their fake in-memory data model. It seems like the `supports*` test methods are also pointing towards possibly wanting to extract a more formalized way to define differences in supported functionality between catalogs, perhaps more fine-grained than what things like e.g. `SupportsNamespaces` java interfaces provide. Is there any work in progress to do something for Catalog interfaces analogous to the [HCFS efforts for HadoopFileSystem](https://cwiki.apache.org/confluence/display/HADOOP2/HCFS) ? Seems like CatalogTests may have already been written with the analogous [AbstractFSContractTestBase](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java) in mind, where we could move towards analogous ["contract" definitions](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/LocalFSContract.java) and [contract config specs](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jackye1995 merged pull request #6152: Docs: Update table snapshot retention property descriptions
jackye1995 merged PR #6152: URL: https://github.com/apache/iceberg/pull/6152 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jackye1995 commented on pull request #6152: Docs: Update table snapshot retention property descriptions
jackye1995 commented on PR #6152: URL: https://github.com/apache/iceberg/pull/6152#issuecomment-1353975897 Thanks for the fix! @amogh-jahagirdar , and thanks for the review @singhpk234 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
flyrain commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050273317 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.apache.iceberg.ChangelogOperation.DELETE; +import static org.apache.iceberg.ChangelogOperation.INSERT; +import static org.apache.iceberg.ChangelogOperation.UPDATE_AFTER; +import static org.apache.iceberg.ChangelogOperation.UPDATE_BEFORE; + +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.junit.Test; + +public class TestChangelogIterator { + private final List rows = + Lists.newArrayList( + new GenericRowWithSchema(new Object[] {1, "a", "data", "DELETE"}, null), + new GenericRowWithSchema(new Object[] {1, "a", "new_data", "INSERT"}, null), + // next two rows belong to different partitions + new GenericRowWithSchema(new Object[] {2, "b", "data", "DELETE"}, null), + new GenericRowWithSchema(new Object[] {3, "c", "data", "INSERT"}, null), + new GenericRowWithSchema(new Object[] {4, "d", "data", "DELETE"}, null), + new GenericRowWithSchema(new Object[] {4, "d", "data", "INSERT"}, null)); + + private final int changeTypeIndex = 3; + private final List partitionIdx = Lists.newArrayList(0, 1); + + @Test + public void testUpdatedRows() { Review Comment: Made the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
flyrain commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050273608 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; Review Comment: A static method is used for concatenating two iterators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator
flyrain commented on code in PR #6344: URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050273739 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ChangelogOperation; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; + +/** + * An iterator that transforms rows from changelog tables within a single Spark task. + * + * It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged + * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the + * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, + * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and + * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and + * one insert-row(row1). Row1 is a carry-over row. + * + * The iterator marks the delete-row and insert-row to be the update-rows. For example, these two + * rows + * + * + * (id=1, data='a', op='DELETE') + * (id=1, data='b', op='INSERT') + * + * + * will be marked as update-rows: + * + * + * (id=1, data='a', op='UPDATE_BEFORE') + * (id=1, data='b', op='UPDATE_AFTER') + * + */ +public class ChangelogIterator implements Iterator, Serializable { + private static final String DELETE = ChangelogOperation.DELETE.name(); + private static final String INSERT = ChangelogOperation.INSERT.name(); + private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name(); + + private final Iterator rowIterator; + private final int changeTypeIndex; + private final List partitionIdx; + + private Row cachedRow = null; + + public ChangelogIterator( + Iterator rowIterator, int changeTypeIndex, List partitionIdx) { +this.rowIterator = rowIterator; +this.changeTypeIndex = changeTypeIndex; +this.partitionIdx = partitionIdx; + } + + @Override + public boolean hasNext() { +if (cachedRow != null) { + return true; +} +return rowIterator.hasNext(); + } + + @Override + public Row next() { +// if there is an updated cached row, return it directly +if (updated(cachedRow)) { + Row row = cachedRow; + cachedRow = null; + return row; +} + +Row currentRow = currentRow(); + +if (rowIterator.hasNext()) { + GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); + cachedRow = nextRow; + + if (updateOrCarryoverRecord(currentRow, nextRow)) { +Row[] rows = update((GenericRowWithSchema) currentRow, nextRow); + +currentRow = rows[0]; +cachedRow = rows[1]; + } +} + +return currentRow; + } + + private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { +GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); +GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values()); + +if (isCarryoverRecord(deletedRow, insertedRow)) { + // set carry-over rows to null for filtering out later + return new Row[] {null, null}; +} else { + deletedRow.update(changeTypeIndex, UPDATE_BEFORE); + insertedRow.update(changeTypeIndex, UPDATE_AFTER); + + return new Row[] { +RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) + }; +} + } + + private boolean isCarryoverRecord(GenericInternalRow deletedRow, GenericInternalRow insertedRow) { +// set the change_type to the same value +deletedRow.update(changeTypeIndex, ""); +insertedRow.update(changeTypeIndex, ""); +return deletedRow.equals(insertedRow); + } + + private boolean updated(Row row) {
[GitHub] [iceberg] flyrain commented on pull request #6344: Spark 3.3: Introduce the changelog iterator
flyrain commented on PR #6344: URL: https://github.com/apache/iceberg/pull/6344#issuecomment-1354023746 Thanks @szehon-ho and @RussellSpitzer for the review. Resolved all comments. Ready for the another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN
rdblue commented on code in PR #6402: URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050275269 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java: ## @@ -246,18 +248,70 @@ private static Optional convertFieldAndLiteral( org.apache.flink.table.expressions.Expression left = args.get(0); org.apache.flink.table.expressions.Expression right = args.get(1); -if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) { - String name = ((FieldReferenceExpression) left).getName(); - Optional lit = convertLiteral((ValueLiteralExpression) right); +Optional lit; +if (left instanceof FieldReferenceExpression) { + lit = convertExpression(right); if (lit.isPresent()) { -return Optional.of(convertLR.apply(name, lit.get())); +return Optional.of(convertLR.apply(((FieldReferenceExpression) left).getName(), lit.get())); Review Comment: I don't see a reason to remove the `name` variable in this block or the mirror one. That introduces needless changes. Can you roll that change back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN
rdblue commented on code in PR #6402: URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277049 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java: ## @@ -246,18 +248,70 @@ private static Optional convertFieldAndLiteral( org.apache.flink.table.expressions.Expression left = args.get(0); org.apache.flink.table.expressions.Expression right = args.get(1); -if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) { - String name = ((FieldReferenceExpression) left).getName(); - Optional lit = convertLiteral((ValueLiteralExpression) right); +Optional lit; +if (left instanceof FieldReferenceExpression) { + lit = convertExpression(right); if (lit.isPresent()) { -return Optional.of(convertLR.apply(name, lit.get())); +return Optional.of(convertLR.apply(((FieldReferenceExpression) left).getName(), lit.get())); } -} else if (left instanceof ValueLiteralExpression -&& right instanceof FieldReferenceExpression) { - Optional lit = convertLiteral((ValueLiteralExpression) left); - String name = ((FieldReferenceExpression) right).getName(); +} else if (right instanceof FieldReferenceExpression) { + lit = convertExpression(left); if (lit.isPresent()) { -return Optional.of(convertRL.apply(name, lit.get())); +return Optional.of( +convertRL.apply(((FieldReferenceExpression) right).getName(), lit.get())); + } +} + +return Optional.empty(); + } + + private static Optional convertExpression( + org.apache.flink.table.expressions.Expression expression) { +if (expression instanceof ValueLiteralExpression) { + return convertLiteral((ValueLiteralExpression) expression); +} else if (expression instanceof CallExpression) { + return convertCallExpression((CallExpression) expression); +} +return Optional.empty(); + } + + private static Optional convertCallExpression(CallExpression call) { +if (!BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { + return Optional.empty(); +} + +List args = call.getResolvedChildren(); +if (args.size() != 2) { + return Optional.empty(); +} + +org.apache.flink.table.expressions.Expression left = args.get(0); +org.apache.flink.table.expressions.Expression right = args.get(1); + +if (left instanceof ValueLiteralExpression && right instanceof TypeLiteralExpression) { + ValueLiteralExpression value = (ValueLiteralExpression) left; + TypeLiteralExpression type = (TypeLiteralExpression) right; + + LogicalType logicalType = type.getOutputDataType().getLogicalType(); + + Optional result = value.getValueAs(logicalType.getDefaultConversion()); + if (result.isPresent()) { +return Optional.of(result.get()); + } + + switch (logicalType.getTypeRoot()) { +case DOUBLE: + Optional strValue = value.getValueAs(String.class); + if (strValue.isPresent()) { +return Optional.of(Double.valueOf(strValue.get())); Review Comment: I think that this needs to handle `NumberFormatException` in case someone uses an expression like `cast("fail" as double)`. You could put a try/except around the switch and return `Optional.empty`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN
rdblue commented on code in PR #6402: URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277901 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java: ## @@ -603,7 +603,108 @@ public void testFilterPushDown2Literal() { } @Test - public void testSqlParseNaN() { -// todo add some test case to test NaN + public void testFilterNaN() { +final String tableName = "test_table_nan"; +try { + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')", + tableName, format.name()); + sql( + "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)", + tableName); + + String sqlNaNDoubleEqual = + String.format("SELECT * FROM %s WHERE d = CAST('NaN' AS DOUBLE) ", tableName); + List resultDoubleEqual = sql(sqlNaNDoubleEqual); + Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size()); + + String sqlNaNDoubleNotEqual = + String.format("SELECT * FROM %s WHERE d <> CAST('NaN' AS DOUBLE) ", tableName); + List resultDoubleNotEqual = sql(sqlNaNDoubleNotEqual); + List expectedDouble = + Lists.newArrayList( + Row.of(1, "iceberg", 10.0d, 1.1f), + Row.of(2, "b", 20.0d, 2.2f), + Row.of(3, null, 30.0d, 3.3f), + Row.of(4, "d", Double.NaN, 4.4f)); Review Comment: Why is this matching? The field is NaN. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN
rdblue commented on code in PR #6402: URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050278409 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java: ## @@ -603,7 +603,108 @@ public void testFilterPushDown2Literal() { } @Test - public void testSqlParseNaN() { -// todo add some test case to test NaN + public void testFilterNaN() { +final String tableName = "test_table_nan"; +try { + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')", + tableName, format.name()); + sql( + "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)", + tableName); + + String sqlNaNDoubleEqual = + String.format("SELECT * FROM %s WHERE d = CAST('NaN' AS DOUBLE) ", tableName); + List resultDoubleEqual = sql(sqlNaNDoubleEqual); + Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size()); Review Comment: This should have 1 record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN
rdblue commented on code in PR #6402: URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050278958 ## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java: ## @@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() { } @Test - public void testSqlParseNaN() { -// todo add some test case to test NaN + public void testFilterNaN() { +sql( +"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')", +TABLE_NAME_NAN, format.name()); +sql( +"INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)", +TABLE_NAME_NAN); + +String sqlNaNDoubleEqual = +String.format("SELECT * FROM %s WHERE d = CAST('NaN' AS DOUBLE) ", TABLE_NAME_NAN); +List resultDoubleEqual = sql(sqlNaNDoubleEqual); +Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size()); Review Comment: See my note below on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #6402: Flink: Add UT for NaN
rdblue commented on PR #6402: URL: https://github.com/apache/iceberg/pull/6402#issuecomment-1354038557 @hililiwei, I flagged the test cases in my review, but I now see that @stevenzwu did as well. The problem is that NaN comparison should always result in `false`. That's why Iceberg doesn't allow `NaN` as a literal value in expressions. If you try to create an expression with a [`NaN` literal, it will fail](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/expressions/Literals.java#L61). That forces callers to be specific. If the caller intends to compare with `NaN`, then the comparison should be `false` and there's no need to pass it to Iceberg. If the caller intends to check whether a value is `NaN`, then there is the [`isNaN` unary predicate](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/expressions/Expressions.java#LL125C28-L125C28). It is up to Flink how to map expressions. In Spark, `NaN` is a valid comparison, so Spark filter translation converts `x = NaN` to `isNaN(x)`. Flink could do something similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #6439: Python: Add pyparsing
rdblue merged PR #6439: URL: https://github.com/apache/iceberg/pull/6439 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #6439: Python: Add pyparsing
rdblue commented on PR #6439: URL: https://github.com/apache/iceberg/pull/6439#issuecomment-1354039711 I think this was my mistake. I thought it was part of the standard library since I didn't need to install it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
rdblue commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1354040055 @cccs-eric, I just merged $6439 that fixes the pyparsing problem (my fault) so you should be able to rebase and get tests working. Sorry about that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6074: API,Core: SnapshotManager to be created through Transaction
rdblue commented on code in PR #6074: URL: https://github.com/apache/iceberg/pull/6074#discussion_r1050282810 ## core/src/main/java/org/apache/iceberg/SnapshotManager.java: ## @@ -30,6 +31,17 @@ public class SnapshotManager implements ManageSnapshots { ops.current() != null, "Cannot manage snapshots: table %s does not exist", tableName); this.transaction = new BaseTransaction(tableName, ops, BaseTransaction.TransactionType.SIMPLE, ops.refresh()); +this.isExternalTransaction = false; + } + + SnapshotManager(BaseTransaction transaction) { +Preconditions.checkNotNull(transaction, "Input transaction cannot be null"); +Preconditions.checkNotNull( Review Comment: @gaborkaszab, this is what I'm talking about with the create transaction comment. Why is this precondition needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050284139 ## python/pyiceberg/expressions/visitors.py: ## @@ -753,3 +757,68 @@ def inclusive_projection( schema: Schema, spec: PartitionSpec, case_sensitive: bool = True ) -> Callable[[BooleanExpression], BooleanExpression]: return InclusiveProjection(schema, spec, case_sensitive).project + + +class _ExpressionProjector(BooleanExpressionVisitor[BooleanExpression]): +"""Rewrites a boolean expression by replacing unbound references with references to fields in a struct schema + +Args: + table_schema (Schema): The schema of the table + file_schema (Schema): The schema of the file + case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True + +Raises: +TypeError: In the case a predicate is already bound +""" + +table_schema: Schema +file_schema: Schema +case_sensitive: bool + +def __init__(self, table_schema: Schema, file_schema: Schema, case_sensitive: bool) -> None: +self.table_schema = table_schema +self.file_schema = file_schema +self.case_sensitive = case_sensitive + +def visit_true(self) -> BooleanExpression: +return AlwaysTrue() + +def visit_false(self) -> BooleanExpression: +return AlwaysFalse() + +def visit_not(self, child_result: BooleanExpression) -> BooleanExpression: +return Not(child=child_result) + +def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: +return And(left=left_result, right=right_result) + +def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: +return Or(left=left_result, right=right_result) + +def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression: +if not isinstance(predicate.term, Reference): +raise ValueError(f"Exprected reference: {predicate.term}") + +field = self.table_schema.find_field(predicate.term.name, case_sensitive=self.case_sensitive) +file_column_name = self.file_schema.find_column_name(field.field_id) + +if not file_column_name: +raise ValueError(f"Not found in schema: {file_column_name}") + +if isinstance(predicate, UnaryPredicate): +return predicate.__class__(Reference(file_column_name)) Review Comment: Shouldn't this be `predicate.as_unbound(Reference(file_column_name))`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050284725 ## python/pyiceberg/expressions/visitors.py: ## @@ -753,3 +757,68 @@ def inclusive_projection( schema: Schema, spec: PartitionSpec, case_sensitive: bool = True ) -> Callable[[BooleanExpression], BooleanExpression]: return InclusiveProjection(schema, spec, case_sensitive).project + + +class _ExpressionProjector(BooleanExpressionVisitor[BooleanExpression]): +"""Rewrites a boolean expression by replacing unbound references with references to fields in a struct schema + +Args: + table_schema (Schema): The schema of the table + file_schema (Schema): The schema of the file + case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True + +Raises: +TypeError: In the case a predicate is already bound +""" + +table_schema: Schema +file_schema: Schema +case_sensitive: bool + +def __init__(self, table_schema: Schema, file_schema: Schema, case_sensitive: bool) -> None: +self.table_schema = table_schema +self.file_schema = file_schema +self.case_sensitive = case_sensitive + +def visit_true(self) -> BooleanExpression: +return AlwaysTrue() + +def visit_false(self) -> BooleanExpression: +return AlwaysFalse() + +def visit_not(self, child_result: BooleanExpression) -> BooleanExpression: +return Not(child=child_result) + +def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: +return And(left=left_result, right=right_result) + +def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: +return Or(left=left_result, right=right_result) + +def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression: +if not isinstance(predicate.term, Reference): +raise ValueError(f"Exprected reference: {predicate.term}") + +field = self.table_schema.find_field(predicate.term.name, case_sensitive=self.case_sensitive) +file_column_name = self.file_schema.find_column_name(field.field_id) + +if not file_column_name: +raise ValueError(f"Not found in schema: {file_column_name}") + +if isinstance(predicate, UnaryPredicate): +return predicate.__class__(Reference(file_column_name)) Review Comment: Oh, nevermind. I see that the input expression is unbound. I'm not sure whether it would be better to do this to a bound expression or not. That seems like good separation of responsibilities and a good way to have consistent error messages when an expression isn't valid for a particular schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050285350 ## python/pyiceberg/expressions/visitors.py: ## @@ -753,3 +757,68 @@ def inclusive_projection( schema: Schema, spec: PartitionSpec, case_sensitive: bool = True ) -> Callable[[BooleanExpression], BooleanExpression]: return InclusiveProjection(schema, spec, case_sensitive).project + + +class _ExpressionProjector(BooleanExpressionVisitor[BooleanExpression]): Review Comment: We already use "project" to describe producing an expression for transformed values, so I think it would be confusing to name this "projector". What about something like "name translator"? I think mentioning that it rewrites expression names is a good thing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics
yegangy0718 commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1050285783 ## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +@Internal +public class ShuffleOperator +extends AbstractStreamOperator> +implements OneInputStreamOperator>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector keySelector; + // the type of the key to collect data statistics + private final TypeInformation keyType; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + private transient Map localDataStatisticsMap; + private transient Map globalDataDistributionWeightMap; + private transient ListState> globalDataDistributionWeightState; + + public ShuffleOperator( + KeySelector keySelector, + TypeInformation keyType, + OperatorEventGateway operatorEventGateway) { +this.keySelector = keySelector; +this.keyType = keyType; +this.operatorEventGateway = operatorEventGateway; + } + + @VisibleForTesting + ListStateDescriptor> generateGlobalDataDistributionWeightDescriptor() { +return new ListStateDescriptor<>( +"globalDataDistributionWeight", new MapTypeInfo<>(keyType, TypeInformation.of(Long.class))); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { +localDataStatisticsMap = Maps.newHashMap(); +globalDataDistributionWeightState = Review Comment: SGTM. I can create a followup PR to add scaling up test case to check the behavior for BroadcastState. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050286173 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]): +file_schema: Schema +table: pa.Table + +def __init__(self, file_schema: Schema, table: pa.Table): +self.file_schema = file_schema +self.table = table + +def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> pa.Table: +return pa.table(struct_result, schema=schema_to_pyarrow(schema)) + +def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> List[pa.ChunkedArray]: +return field_results + +def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray: +column_name = self.file_schema.find_column_name(field.field_id) + +if column_name: +column_idx = self.table.schema.get_field_index(column_name) +else: +column_idx = -1 + +expected_arrow_type = schema_to_pyarrow(field.field_type) + +# The idx will be -1 when the column can't be found +if column_idx >= 0: +column_field: pa.Field = self.table.schema[column_idx] +column_arrow_type: pa.DataType = column_field.type +column_data: pa.ChunkedArray = self.table[column_idx] + +# In case of schema evolution +if column_arrow_type != expected_arrow_type: +column_data = column_data.cast(expected_arrow_type) +else: +import numpy as np + +column_data = pa.array(np.full(shape=len(self.table), fill_value=None), type=expected_arrow_type) +return column_data + +def list(self, _: ListType, element_result: pa.ChunkedArray) -> pa.ChunkedArray: +pass + +def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: pa.ChunkedArray) -> pa.DataType: +pass + +def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray: +pass + + +def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> pa.Table: +return visit(final_schema, _ConstructFinalSchema(schema, table)) + + +def project_table( +files: Iterable["FileScanTask"], table: "Table", row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +projected_field_ids = projected_schema.field_ids + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +file_project_schema = prune_columns(file_schema, projected_field_ids) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +row_filter = project_expression(row_filter, table.schema(), file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +if file_schema is None: +raise ValueError(f"Iceberg schema not encoded in Parquet file: {path}") Review Comment: Missing Iceberg schema for file: {path}? That avoids negation and needing to understand why it would be "encoded" in a file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050287321 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]): +file_schema: Schema +table: pa.Table + +def __init__(self, file_schema: Schema, table: pa.Table): +self.file_schema = file_schema +self.table = table + +def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> pa.Table: +return pa.table(struct_result, schema=schema_to_pyarrow(schema)) + +def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> List[pa.ChunkedArray]: +return field_results + +def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray: +column_name = self.file_schema.find_column_name(field.field_id) + +if column_name: +column_idx = self.table.schema.get_field_index(column_name) +else: +column_idx = -1 + +expected_arrow_type = schema_to_pyarrow(field.field_type) + +# The idx will be -1 when the column can't be found +if column_idx >= 0: +column_field: pa.Field = self.table.schema[column_idx] +column_arrow_type: pa.DataType = column_field.type +column_data: pa.ChunkedArray = self.table[column_idx] + +# In case of schema evolution +if column_arrow_type != expected_arrow_type: +column_data = column_data.cast(expected_arrow_type) +else: +import numpy as np + +column_data = pa.array(np.full(shape=len(self.table), fill_value=None), type=expected_arrow_type) +return column_data + +def list(self, _: ListType, element_result: pa.ChunkedArray) -> pa.ChunkedArray: +pass + +def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: pa.ChunkedArray) -> pa.DataType: +pass + +def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray: +pass + + +def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> pa.Table: +return visit(final_schema, _ConstructFinalSchema(schema, table)) + + +def project_table( +files: Iterable["FileScanTask"], table: "Table", row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +projected_field_ids = projected_schema.field_ids + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +file_project_schema = prune_columns(file_schema, projected_field_ids) + +pyarrow_filter = None +if row_filter is not AlwaysTrue(): +row_filter = project_expression(row_filter, table.schema(), file_schema, case_sensitive=case_sensitive) +bound_row_filter = bind(file_schema, row_filter, case_sensitive=case_sensitive) +pyarrow_filter = expression_to_pyarrow(bound_row_filter) + +if file_schema is None: +raise ValueError(f"Iceberg schema not encoded in Parquet file: {path}") + +# Prune the stuff that we don't need anyway +file_project_schema_arrow = schema_to_pyarrow(file_project_schema) + +arrow_table = ds.dataset( Review Comment: What happens when one file has a column but others don't? For example, if I run `ALTER TABLE t ADD COLUMN c int` and read files before and after the change? Also, what happens if we end up with mixed types as well because c was promoted from `int` to `long`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050287724 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]): +file_schema: Schema +table: pa.Table + +def __init__(self, file_schema: Schema, table: pa.Table): +self.file_schema = file_schema +self.table = table + +def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> pa.Table: +return pa.table(struct_result, schema=schema_to_pyarrow(schema)) + +def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> List[pa.ChunkedArray]: +return field_results + +def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray: +column_name = self.file_schema.find_column_name(field.field_id) + +if column_name: +column_idx = self.table.schema.get_field_index(column_name) +else: +column_idx = -1 + +expected_arrow_type = schema_to_pyarrow(field.field_type) + +# The idx will be -1 when the column can't be found +if column_idx >= 0: +column_field: pa.Field = self.table.schema[column_idx] +column_arrow_type: pa.DataType = column_field.type +column_data: pa.ChunkedArray = self.table[column_idx] + +# In case of schema evolution +if column_arrow_type != expected_arrow_type: +column_data = column_data.cast(expected_arrow_type) +else: +import numpy as np + +column_data = pa.array(np.full(shape=len(self.table), fill_value=None), type=expected_arrow_type) +return column_data + +def list(self, _: ListType, element_result: pa.ChunkedArray) -> pa.ChunkedArray: +pass + +def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: pa.ChunkedArray) -> pa.DataType: +pass + +def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray: +pass + + +def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> pa.Table: +return visit(final_schema, _ConstructFinalSchema(schema, table)) + + +def project_table( +files: Iterable["FileScanTask"], table: "Table", row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +projected_field_ids = projected_schema.field_ids + +tables = [] +for task in files: Review Comment: I think the inner part of this loop should be a Parquet method that we provide, so that the caller can read progressively or read parts in parallel tasks. This is a great start for single process, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID
rdblue commented on code in PR #6437: URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050288238 ## python/pyiceberg/io/pyarrow.py: ## @@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]): +file_schema: Schema +table: pa.Table + +def __init__(self, file_schema: Schema, table: pa.Table): +self.file_schema = file_schema +self.table = table + +def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> pa.Table: +return pa.table(struct_result, schema=schema_to_pyarrow(schema)) + +def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> List[pa.ChunkedArray]: +return field_results + +def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray: +column_name = self.file_schema.find_column_name(field.field_id) + +if column_name: +column_idx = self.table.schema.get_field_index(column_name) +else: +column_idx = -1 + +expected_arrow_type = schema_to_pyarrow(field.field_type) + +# The idx will be -1 when the column can't be found +if column_idx >= 0: +column_field: pa.Field = self.table.schema[column_idx] +column_arrow_type: pa.DataType = column_field.type +column_data: pa.ChunkedArray = self.table[column_idx] + +# In case of schema evolution +if column_arrow_type != expected_arrow_type: +column_data = column_data.cast(expected_arrow_type) +else: +import numpy as np + +column_data = pa.array(np.full(shape=len(self.table), fill_value=None), type=expected_arrow_type) +return column_data + +def list(self, _: ListType, element_result: pa.ChunkedArray) -> pa.ChunkedArray: +pass + +def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: pa.ChunkedArray) -> pa.DataType: +pass + +def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray: +pass + + +def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> pa.Table: +return visit(final_schema, _ConstructFinalSchema(schema, table)) + + +def project_table( +files: Iterable["FileScanTask"], table: "Table", row_filter: BooleanExpression, projected_schema: Schema, case_sensitive: bool +) -> pa.Table: +if isinstance(table.io, PyArrowFileIO): +scheme, path = PyArrowFileIO.parse_location(table.location()) +fs = table.io.get_fs(scheme) +else: +raise ValueError(f"Expected PyArrowFileIO, got: {table.io}") + +projected_field_ids = projected_schema.field_ids + +tables = [] +for task in files: +_, path = PyArrowFileIO.parse_location(task.file.file_path) + +# Get the schema +with fs.open_input_file(path) as fout: +parquet_schema = pq.read_schema(fout) +schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA) +file_schema = Schema.parse_raw(schema_raw) + +file_project_schema = prune_columns(file_schema, projected_field_ids) Review Comment: This is going to produce a subset of the requested schema. If the file has columns a and b, but the requested schema has a, b, and c, then this is going to only have the ones from the file and will produce a dataset with a missing column. That's okay if Arrow knows how to handle it below in `concat_tables`, I think. We just need to be careful that we are producing an Arrow table that matches the requested schema, not just the file schemas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] cccs-eric commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
cccs-eric commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1354058385 > @cccs-eric, I just merged $6439 that fixes the pyparsing problem (my fault) so you should be able to rebase and get tests working. Sorry about that! Thanks @rdblue , build is now passing 🥳 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] cccs-eric commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)
cccs-eric commented on PR #6392: URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1354060350 @Fokko One last thing that I haven't done is modifying verify-release.md for the integration tests. Should I add `test-adlfs` in there? https://github.com/apache/iceberg/blob/master/python/mkdocs/docs/verify-release.md?plain=1#L86-L90 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xwmr-max opened a new pull request, #6440: Flink: Support Look-up Function
xwmr-max opened a new pull request, #6440: URL: https://github.com/apache/iceberg/pull/6440 Currently, ice does not support look-up join. This PR provides the look-up join function to meet the requirements of basic join scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org