[GitHub] [iceberg] deadwind4 opened a new issue, #6429: [Feature Proposal] Log Store in Iceberg

2022-12-15 Thread GitBox


deadwind4 opened a new issue, #6429:
URL: https://github.com/apache/iceberg/issues/6429

   ### Feature Request / Improvement
   
   This proposal aims to improve Iceberg's capability in real-time via 
importing a log store system. Streaming read data that are cached in a log 
store(Kafka).
   
   Proposal document:
   
https://docs.google.com/document/d/1Zy4bbvSuN9ax7Tq4TuxL2vi4xQ4DR4wbR4QK5m69qQo/edit?usp=sharing
   
   Code demo:
   https://github.com/deadwind4/iceberg/tree/log-store
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new issue, #6430: Python: Support for static table

2022-12-15 Thread GitBox


Fokko opened a new issue, #6430:
URL: https://github.com/apache/iceberg/issues/6430

   ### Feature Request / Improvement
   
   In Java we have the StaticTableOperations: 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/StaticTableOperations.java
 That allows the user to directly load a table from a metadata location. This 
static table does not support any modifications to the table. I think this 
would be valuable for PyIceberg as well, as this will enable users to easily 
pull in data, or share datasets.
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on issue #3220: [Python] support iceberg hadoop catalog in python library

2022-12-15 Thread GitBox


Fokko commented on issue #3220:
URL: https://github.com/apache/iceberg/issues/3220#issuecomment-1352731893

   Hey everyone, I think we should split out the idea of implementing the full 
hadoop catalog, and just being able to read a table from a metadata URL. For 
the latter, I've created a new issue: 
https://github.com/apache/iceberg/issues/6430
   
   My apologies for invoking the negative feelings around the HadoopCatalog, 
and I fully agree that this is something we should avoid in PyIceberg. However, 
I do see a lot of value in just reading (not writing!) and just using a path to 
the metadata.
   
   @srilman I've recently started working on [integration 
tests](https://github.com/apache/iceberg/pull/6398) that write tables using 
Spark (since PyIceberg doesn't have any read support), and these are shared 
using the RestCatalog. Would love to get your thoughts on that!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on issue #6397: Python Instructions currently do not work for testing

2022-12-15 Thread GitBox


Fokko commented on issue #6397:
URL: https://github.com/apache/iceberg/issues/6397#issuecomment-1352734967

   @rubenvdg What do you think of removing the dataclass from the `AvroStruct`. 
We should be able to create a Struct without including it in the PyIceberg 
class hierarchy. The `AvroStruct` is currently frozen as well, but that doesn't 
make much sense since we have a `set` method as well 🤔 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] zhongyujiang opened a new pull request, #6431: Parquet: Fix ParquetDictionaryRowGroupFilter evaluating NaN.

2022-12-15 Thread GitBox


zhongyujiang opened a new pull request, #6431:
URL: https://github.com/apache/iceberg/pull/6431

   This PR fixs ParquetDictionaryRowGroupFilter evaluating `notNaN`. 
   Because Parquet dictionaries cannot contain null values, 
ParquetDictionaryRowGroupFilter should check if there is null values in the 
column chunk when evaluting `notNaN` and return `true` if yes.
   
   This also improves looking up `NaN` value in the dictionary set, in Java, 
both Double.NaN and Float.NaN are considered to be equal to themselves, so:
   - the dict must contain values that are not `NaN` when its' size is greater 
than 1, can directly return `true` in this case;
   - can use `Set#contains` to look up `NaN` directly instead of comparing 
elements in the collection one by one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] zhongyujiang commented on pull request #6431: Parquet: Fix ParquetDictionaryRowGroupFilter evaluating NaN.

2022-12-15 Thread GitBox


zhongyujiang commented on PR #6431:
URL: https://github.com/apache/iceberg/pull/6431#issuecomment-1352852748

   @yyanyy @rdblue  Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rbalamohan opened a new pull request, #6432: Consider moving to ParallelIterable in Deletes::toPositionIndex

2022-12-15 Thread GitBox


rbalamohan opened a new pull request, #6432:
URL: https://github.com/apache/iceberg/pull/6432

   Issue: https://github.com/apache/iceberg/issues/6387
   
   When tables are updated in "merge-on-read" mode, it creates positional 
delete files. Performance of reads degrades quite a bit, even with 4+ 
positional delete files (I tried with tpcds queries).
   
   Depending on workload, data file may have to read multiple "positional 
delete" files to construct delete positions. This does not sound expensive, but 
when large number of medium sized files are present in a partition, 
combinedfiletask ends up with many files. So a task has to process the data 
files in sequential fashion and every data file reads multiple positional 
delete file causing slowness.
   
   PR uses "ParallelIterable" in "Deletes::toPositionIndex". RoaringBitMap 
isn't threadsafe and hence added sync in BitmapPositionDeleteIndex. Tried out 
in local cluster and confirmed that this is not expensive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jaceklaskowski opened a new pull request, #6433: Docs: README

2022-12-15 Thread GitBox


jaceklaskowski opened a new pull request, #6433:
URL: https://github.com/apache/iceberg/pull/6433

   Found some very minor "issues" while reading README.md and couldn't resist 
fixing them all up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] cccs-eric commented on a diff in pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


cccs-eric commented on code in PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#discussion_r1049565481


##
python/Makefile:
##
@@ -26,14 +26,21 @@ lint:
poetry run pre-commit run --all-files
 
 test:
-   poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not 
s3" ${PYTEST_ARGS}
+   poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3 
and not adlfs" ${PYTEST_ARGS}

Review Comment:
   Yeah, that sounds good.  I'll leave my code as-is and if my PR goes in 
before yours, you'll have to modify this line as a conflict resolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] grbinho commented on pull request #6223: AWS: Use provided glue catalog id in defaultWarehouseLocation

2022-12-15 Thread GitBox


grbinho commented on PR #6223:
URL: https://github.com/apache/iceberg/pull/6223#issuecomment-1352975547

   @JonasJ-ap @ajantha-bhat 
   Hi guys, can you advise how we can move this MR forward?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] djouallah commented on issue #3220: [Python] support iceberg hadoop catalog in python library

2022-12-15 Thread GitBox


djouallah commented on issue #3220:
URL: https://github.com/apache/iceberg/issues/3220#issuecomment-1353002821

   for people that uses only Python ( no spark, nor glue and all this big 
engine stuff), is there any simple to use catalog, I read about this REST 
thing, but it seems it is only a spec, not an actual piece of software ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] djouallah commented on issue #6430: Python: Support for static table

2022-12-15 Thread GitBox


djouallah commented on issue #6430:
URL: https://github.com/apache/iceberg/issues/6430#issuecomment-1353005103

   FWIW, BigQuery has a *read only* implementation that uses the metadata 
location.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


Fokko commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353167480

   @cccs-eric it looks like we have to add `pyparsing` to the `pyproject.toml`. 
I don't know why it was working before, but it should have been added in 
https://github.com/apache/iceberg/pull/6259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] joshuarobinson opened a new issue, #6434: PyIceberg support for UUID types

2022-12-15 Thread GitBox


joshuarobinson opened a new issue, #6434:
URL: https://github.com/apache/iceberg/issues/6434

   ### Feature Request / Improvement
   
   Currently, pyiceberg 0.2.0 fails on creating a table scan for any table 
(that I have at least) with UUID columns.
   
   The root problem seems to be (thanks @Fokko for explanation) that UUID is 
encoded as a string in avro but as a fixed-width type in Iceberg.
   
   The failure looks like this:
   ```
   ValueError: Unknown logical/physical type combination: {'type': 'fixed', 
'name': 'uuid_fixed', 'size': 16, 'logicalType': 'uuid'}
   ```
   
   The full stacktrace:
   ```
   Traceback (most recent call last):
 File "/read.py", line 16, in 
   
print(tbl.scan(selected_fields=("path","device")).to_arrow().to_pandas().head())
 ^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 
350, in to_arrow
   for task in self.plan_files():
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 
335, in plan_files
   yield from (FileScanTask(file) for file in matching_partition_files)
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 
335, in 
   yield from (FileScanTask(file) for file in matching_partition_files)
  ^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 
149, in 
   return (entry.data_file for entry in live_entries(input_file))
  ^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 
145, in 
   return (entry for entry in read_manifest_entry(input_file) if 
entry.status != ManifestEntryStatus.DELETED)
  
^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 
137, in read_manifest_entry
   with AvroFile(input_file) as reader:
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", 
line 136, in __enter__
   self.schema = self.header.get_schema()
 
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", 
line 85, in get_schema
   return AvroSchemaConversion().avro_to_iceberg(avro_schema)
  ^^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 119, in avro_to_iceberg
   return Schema(*[self._convert_field(field) for field in 
avro_schema["fields"]], schema_id=1)
  
^^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 119, in 
   return Schema(*[self._convert_field(field) for field in 
avro_schema["fields"]], schema_id=1)
   ^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 227, in _convert_field
   field_type=self._convert_schema(plain_type),
  
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 196, in _convert_schema
   return self._convert_record_type(avro_type)
  
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 284, in _convert_record_type
   return StructType(*[self._convert_field(field) for field in 
record_type["fields"]])
  
^^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 284, in 
   return StructType(*[self._convert_field(field) for field in 
record_type["fields"]])
   ^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 227, in _convert_field
   field_type=self._convert_schema(plain_type),
  
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 196, in _convert_schema
   return self._convert_record_type(avro_type)
  
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 284, in _convert_record_type
   return StructType(*[self._convert_field(field) for field in 
record_type["fields"]])
  
^^^
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/utils/schema_conversion.py", 
line 284, in 
   retu

[GitHub] [iceberg] Fokko commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


Fokko commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353168942

   @cccs-eric I forgot one thing, could you also add the `adlfs` option to the 
docs in `python/mkdocs/`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] joshuarobinson opened a new issue, #6435: PyIceberg: Avro decode EOF error

2022-12-15 Thread GitBox


joshuarobinson opened a new issue, #6435:
URL: https://github.com/apache/iceberg/issues/6435

   ### Feature Request / Improvement
   
   In reading manifests for a table for a table scan in PyIceberg 0.2.0, I get 
an EOFError.
   
   Table was originally written in June 2022 with the most recent version of 
Spark/Iceberg at that time. (3.2.3 and 0.14.1 I think)
   
   Full tracebook of the error:
   ```
   Traceback (most recent call last):
 File "/read.py", line 16, in 
   print(tbl.scan(selected_fields=("time", 
"icao24","call_sign","origin_country")).to_arrow().to_pandas().head())
 

 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 
350, in to_arrow
   for task in self.plan_files():
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 
335, in plan_files
   yield from (FileScanTask(file) for file in matching_partition_files)
 File 
"/usr/local/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 
335, in 
   yield from (FileScanTask(file) for file in matching_partition_files)
  ^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 
149, in 
   return (entry.data_file for entry in live_entries(input_file))
  ^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 
145, in 
   return (entry for entry in read_manifest_entry(input_file) if 
entry.status != ManifestEntryStatus.DELETED)
  
^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/manifest.py", line 
139, in read_manifest_entry
   for record in reader:
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", 
line 178, in __next__
   return self.__next__()
  ^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", 
line 170, in __next__
   return next(self.block)
  
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/file.py", 
line 106, in __next__
   return self.reader.read(self.block_decoder)
  
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", 
line 283, in read
   result[pos] = field.read(decoder)
 ^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", 
line 283, in read
   result[pos] = field.read(decoder)
 ^^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", 
line 267, in read
   return self.option.read(decoder)
  ^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", 
line 329, in read
   read_items[key] = self.value.read(decoder)
 
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/reader.py", 
line 234, in read
   return decoder.read_bytes()
  
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/decoder.py", 
line 122, in read_bytes
   return self.read(self.read_int())
  ^^
 File "/usr/local/lib/python3.11/site-packages/pyiceberg/avro/decoder.py", 
line 58, in read
   raise EOFError
   EOFError
   ```
   
   The table can be read successfully with Trino.
   
   for interest, the schema of my table is 
   ```
   table {
 1: time: optional long
 2: icao24: optional string
 3: callsign: optional string
 4: origin_country: optional string
 5: time_position: optional double
 6: last_contact: optional long
 7: longitude: optional double
 8: latitude: optional double
 9: baro_altitude: optional double
 10: on_ground: optional boolean
 11: velocity: optional double
 12: true_track: optional double
 13: vertical_rate: optional double
 14: geo_altitude: optional double
 15: squawk: optional string
 16: spi: optional boolean
 17: position_source: optional long
   }
   ```
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@ice

[GitHub] [iceberg] joshuarobinson commented on issue #6435: PyIceberg: Avro decode EOF error

2022-12-15 Thread GitBox


joshuarobinson commented on issue #6435:
URL: https://github.com/apache/iceberg/issues/6435#issuecomment-1353182160

   The table in question has only one snapshot. I'm attaching the json and avro 
metadata files for this table.
   
[iceberg_6435_metadata.zip](https://github.com/apache/iceberg/files/10237824/iceberg_6435_metadata.zip)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


Fokko commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353199075

   @cccs-eric Yes, please do!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra closed pull request #6436: Core: Add flag to control sending metric reports via REST

2022-12-15 Thread GitBox


nastra closed pull request #6436: Core: Add flag to control sending metric 
reports via REST
URL: https://github.com/apache/iceberg/pull/6436


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu merged pull request #6412: Doc: Modify some options refer to Read-options in flink streaming rea…

2022-12-15 Thread GitBox


stevenzwu merged PR #6412:
URL: https://github.com/apache/iceberg/pull/6412


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on pull request #6412: Doc: Modify some options refer to Read-options in flink streaming rea…

2022-12-15 Thread GitBox


stevenzwu commented on PR #6412:
URL: https://github.com/apache/iceberg/pull/6412#issuecomment-1353410391

   thx @xwmr-max for the update


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on a diff in pull request #6399: API: Add strict metadata cleanup to SnapshotProducer

2022-12-15 Thread GitBox


nastra commented on code in PR #6399:
URL: https://github.com/apache/iceberg/pull/6399#discussion_r1049921925


##
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##
@@ -396,7 +399,9 @@ public void commit() {
 } catch (CommitStateUnknownException commitStateUnknownException) {
   throw commitStateUnknownException;
 } catch (RuntimeException e) {
-  Exceptions.suppressAndThrow(e, this::cleanAll);
+  if (!strictCleanup || e instanceof CleanableFailure) {
+Exceptions.suppressAndThrow(e, this::cleanAll);

Review Comment:
   just curious, would we need check for `strictCleanup` in 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L453-L463
 as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new pull request, #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


Fokko opened a new pull request, #6437:
URL: https://github.com/apache/iceberg/pull/6437

   instead of name


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] nastra commented on a diff in pull request #6399: API: Add strict metadata cleanup to SnapshotProducer

2022-12-15 Thread GitBox


nastra commented on code in PR #6399:
URL: https://github.com/apache/iceberg/pull/6399#discussion_r1049921925


##
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##
@@ -396,7 +399,9 @@ public void commit() {
 } catch (CommitStateUnknownException commitStateUnknownException) {
   throw commitStateUnknownException;
 } catch (RuntimeException e) {
-  Exceptions.suppressAndThrow(e, this::cleanAll);
+  if (!strictCleanup || e instanceof CleanableFailure) {
+Exceptions.suppressAndThrow(e, this::cleanAll);

Review Comment:
   just curious, would we need to check for `strictCleanup` in 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTransaction.java#L453-L463
 as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on pull request #6419: Doc:Example of correcting the document add/drop partition truncate

2022-12-15 Thread GitBox


szehon-ho commented on PR #6419:
URL: https://github.com/apache/iceberg/pull/6419#issuecomment-1353527618

   I may be missing something but don't both truncate(data, 4) and 
truncate(4,data) do the same thing ?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on pull request #6427: Spark 3.2: Time range query of changelog tables

2022-12-15 Thread GitBox


flyrain commented on PR #6427:
URL: https://github.com/apache/iceberg/pull/6427#issuecomment-1353556854

   Thanks @szehon-ho.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain merged pull request #6427: Spark 3.2: Time range query of changelog tables

2022-12-15 Thread GitBox


flyrain merged PR #6427:
URL: https://github.com/apache/iceberg/pull/6427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] e-gat commented on issue #6421: Running rewriteDataFiles on multiple executors in Spark

2022-12-15 Thread GitBox


e-gat commented on issue #6421:
URL: https://github.com/apache/iceberg/issues/6421#issuecomment-1353571205

   After investigation we found that the latest iceberg versions support 
running the rewriteDataFiles across multiple executors in spark. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] e-gat closed issue #6421: Running rewriteDataFiles on multiple executors in Spark

2022-12-15 Thread GitBox


e-gat closed issue #6421: Running rewriteDataFiles on multiple executors in 
Spark
URL: https://github.com/apache/iceberg/issues/6421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-15 Thread GitBox


flyrain commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050040299


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};

Review Comment:
   Was trying to make it more readable by return 2 nulls. Will make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@ice

[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-15 Thread GitBox


flyrain commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050041610


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};
+} else {
+  deletedRow.update(changeTypeIndex, UPDATE_BEFORE);
+  insertedRow.update(changeTypeIndex, UPDATE_AFTER);
+
+  return new Row[] {
+RowFactory.create(deletedRow.values()), 
RowFactory.create(insertedRow.values())

Review Comment:
   `GenericInternalRow` is not a subclass of `Row`, we need to return a `Row` 
instance here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queri

[GitHub] [iceberg] bondarenko commented on issue #6257: Partitions metadata table shows old partitions

2022-12-15 Thread GitBox


bondarenko commented on issue #6257:
URL: https://github.com/apache/iceberg/issues/6257#issuecomment-1353617197

   Looks like without `USING iceberg` you don't create iceberg table and so it 
doesn't have to have even update support not speaking about partitions table


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new pull request, #6438: Python: Reduce the use of mock objects

2022-12-15 Thread GitBox


Fokko opened a new pull request, #6438:
URL: https://github.com/apache/iceberg/pull/6438

   We use mocks extensively in our Python code, this was before we had certain 
functionality available, such as a working FileIO.
   
   Instead of using the mocks, we can also use the actual code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new pull request, #6439: Python: Add pyparsing

2022-12-15 Thread GitBox


Fokko opened a new pull request, #6439:
URL: https://github.com/apache/iceberg/pull/6439

   This one was missing and was being pulled in transitively I presume


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] islamismailov commented on pull request #6353: Make sure S3 stream opened by ReadConf ctor is closed

2022-12-15 Thread GitBox


islamismailov commented on PR #6353:
URL: https://github.com/apache/iceberg/pull/6353#issuecomment-1353743389

   i will try to update this PR with the feedback provided


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] cccs-eric commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


cccs-eric commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1353755124

   > @cccs-eric it looks like we have to add `pyparsing` to the 
`pyproject.toml`. I don't know why it was working before, but it should have 
been added in #6259
   
   @Fokko I don't understand what is going on.  `pyparsing` is part of 
`pyproject.toml` and when I run `make test-s3`, *it works on my machine* ™️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6365: Core: Add position deletes metadata table

2022-12-15 Thread GitBox


RussellSpitzer commented on code in PR #6365:
URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050207289


##
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan>

Review Comment:
   Probably need a doc on this explaining why this class exists



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6365: Core: Add position deletes metadata table

2022-12-15 Thread GitBox


RussellSpitzer commented on code in PR #6365:
URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050213598


##
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan>
+extends BaseScan {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+  TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+return context().snapshotId();
+  }
+
+  protected Map options() {
+return context().options();
+  }
+
+  protected abstract CloseableIterable doPlanFiles();
+
+  protected ScanMetrics scanMetrics() {
+if (scanMetrics == null) {
+  this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+}
+
+return scanMetrics;
+  }
+
+  @Override
+  public Table table() {

Review Comment:
   we are overriding this to make it public? Is this something other callers 
need?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on pull request #6425: Core: Unify fromJson(String) parsing

2022-12-15 Thread GitBox


rdblue commented on PR #6425:
URL: https://github.com/apache/iceberg/pull/6425#issuecomment-1353818545

   Thanks, @nastra! Good to have these cleaned up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue merged pull request #6425: Core: Unify fromJson(String) parsing

2022-12-15 Thread GitBox


rdblue merged PR #6425:
URL: https://github.com/apache/iceberg/pull/6425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6365: Core: Add position deletes metadata table

2022-12-15 Thread GitBox


szehon-ho commented on code in PR #6365:
URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050217764


##
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan>
+extends BaseScan {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+  TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {

Review Comment:
   Note: this is all just refactored from "BaseTableScan".  So that 
PositionDeleteTableScan can share the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6365: Core: Add position deletes metadata table

2022-12-15 Thread GitBox


RussellSpitzer commented on code in PR #6365:
URL: https://github.com/apache/iceberg/pull/6365#discussion_r1050219983


##
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##
@@ -64,9 +64,12 @@ protected BaseMetadataTable(TableOperations ops, Table 
table, String name) {
*/
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec 
spec) {
 PartitionSpec.Builder identitySpecBuilder =
-PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
+PartitionSpec.builderFor(metadataTableSchema)
+.withSpecId(spec.specId())
+.checkConflicts(false);
 for (PartitionField field : spec.fields()) {
-  identitySpecBuilder.add(field.fieldId(), field.name(), 
Transforms.identity());
+  identitySpecBuilder.add(
+  field.fieldId(), field.fieldId(), field.name(), 
Transforms.identity());

Review Comment:
   Shouldn't this be field.sourceId(), field.fieldId()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on issue #6415: Vectorized Read Issue

2022-12-15 Thread GitBox


rdblue commented on issue #6415:
URL: https://github.com/apache/iceberg/issues/6415#issuecomment-1353829220

   I agree with the analysis that the problem is that we are returning 
dictionary-encoded Arrow vectors. Maybe we're not doing that the right way. 
I'll take a look at #3024.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] sfc-gh-mparmar commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables

2022-12-15 Thread GitBox


sfc-gh-mparmar commented on code in PR #6428:
URL: https://github.com/apache/iceberg/pull/6428#discussion_r1050236173


##
snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {

Review Comment:
   For the first phase, we are supporting a small subset (read-only) of 
capabilities  for the catalog which doesn't include the ability to 
create/modify namespaces, tables etc. so we can't repurpose CatalogTests in 
this case yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] closed issue #4931: Rewrite Data Files with Manual Sort Order should also use Table Partitioning in Sort Order

2022-12-15 Thread GitBox


github-actions[bot] closed issue #4931: Rewrite Data Files with Manual Sort 
Order should also use Table Partitioning in Sort Order
URL: https://github.com/apache/iceberg/issues/4931


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #4931: Rewrite Data Files with Manual Sort Order should also use Table Partitioning in Sort Order

2022-12-15 Thread GitBox


github-actions[bot] commented on issue #4931:
URL: https://github.com/apache/iceberg/issues/4931#issuecomment-1353917180

   This issue has been closed because it has not received any activity in the 
last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dennishuo commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables

2022-12-15 Thread GitBox


dennishuo commented on code in PR #6428:
URL: https://github.com/apache/iceberg/pull/6428#discussion_r1050257006


##
snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {

Review Comment:
   Good suggestion! I recall skimming over it when looking at the JdbcCatalog's 
unittest, but saw most of the test cases were for (currently) unsupported 
functionality. Just for completeness though I went ahead and plugged it in 
(which helped identify the need to specify `test { useJUnitPlatform() }` in 
build.gradle to successfully inherit the tests) and verified that they all fail 
due to depending on at least `buildTable/createNamespace` to set up test data.
   
   One possibility if we want to split out read-only variations even for 
temporary scenarios, would be to abstract out the mutations into helper methods 
in the `CatalogTests` base class, and only test cases that explicitly test 
mutations would call the catalog directly, letting read-only stuff do the 
mutations in their fake in-memory data model.
   
   It seems like the `supports*` test methods are also pointing towards 
possibly wanting to extract a more formalized way to define differences in 
supported functionality between catalogs, perhaps more fine-grained than what 
things like e.g. `SupportsNamespaces` java interfaces provide.
   
   Is there any work in progress to do something for Catalog interfaces 
analogous to the [HCFS efforts for 
HadoopFileSystem](https://cwiki.apache.org/confluence/display/HADOOP2/HCFS) ? 
   
   Seems like CatalogTests may have already been written with the analogous 
[AbstractFSContractTestBase](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java)
 in mind, where we could move towards analogous ["contract" 
definitions](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/LocalFSContract.java)
 and [contract config 
specs](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dennishuo commented on a diff in pull request #6428: Add new SnowflakeCatalog implementation to enable directly using Snowflake-managed Iceberg tables

2022-12-15 Thread GitBox


dennishuo commented on code in PR #6428:
URL: https://github.com/apache/iceberg/pull/6428#discussion_r1050257006


##
snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.snowflake;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnowflakeCatalogTest {

Review Comment:
   Good suggestion! I recall skimming over it when looking at the JdbcCatalog's 
unittest, but saw most of the test cases were for (currently) unsupported 
functionality. Just for completeness though I went ahead and plugged it in 
(which helped identify the need to specify `test { useJUnitPlatform() }` in 
build.gradle to successfully inherit the tests) and verified that they all fail 
due to depending on at least `buildTable/createNamespace` to set up test data, 
as @sfc-gh-mparmar mentioned.
   
   One possibility if we want to split out read-only variations even for 
temporary scenarios, would be to abstract out the mutations into helper methods 
in the `CatalogTests` base class, and only test cases that explicitly test 
mutations would call the catalog directly, letting read-only stuff do the 
mutations in their fake in-memory data model.
   
   It seems like the `supports*` test methods are also pointing towards 
possibly wanting to extract a more formalized way to define differences in 
supported functionality between catalogs, perhaps more fine-grained than what 
things like e.g. `SupportsNamespaces` java interfaces provide.
   
   Is there any work in progress to do something for Catalog interfaces 
analogous to the [HCFS efforts for 
HadoopFileSystem](https://cwiki.apache.org/confluence/display/HADOOP2/HCFS) ? 
   
   Seems like CatalogTests may have already been written with the analogous 
[AbstractFSContractTestBase](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java)
 in mind, where we could move towards analogous ["contract" 
definitions](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/LocalFSContract.java)
 and [contract config 
specs](https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jackye1995 merged pull request #6152: Docs: Update table snapshot retention property descriptions

2022-12-15 Thread GitBox


jackye1995 merged PR #6152:
URL: https://github.com/apache/iceberg/pull/6152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jackye1995 commented on pull request #6152: Docs: Update table snapshot retention property descriptions

2022-12-15 Thread GitBox


jackye1995 commented on PR #6152:
URL: https://github.com/apache/iceberg/pull/6152#issuecomment-1353975897

   Thanks for the fix! @amogh-jahagirdar , and thanks for the review 
@singhpk234 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-15 Thread GitBox


flyrain commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050273317


##
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.iceberg.ChangelogOperation.DELETE;
+import static org.apache.iceberg.ChangelogOperation.INSERT;
+import static org.apache.iceberg.ChangelogOperation.UPDATE_AFTER;
+import static org.apache.iceberg.ChangelogOperation.UPDATE_BEFORE;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.junit.Test;
+
+public class TestChangelogIterator {
+  private final List rows =
+  Lists.newArrayList(
+  new GenericRowWithSchema(new Object[] {1, "a", "data", "DELETE"}, 
null),
+  new GenericRowWithSchema(new Object[] {1, "a", "new_data", 
"INSERT"}, null),
+  // next two rows belong to different partitions
+  new GenericRowWithSchema(new Object[] {2, "b", "data", "DELETE"}, 
null),
+  new GenericRowWithSchema(new Object[] {3, "c", "data", "INSERT"}, 
null),
+  new GenericRowWithSchema(new Object[] {4, "d", "data", "DELETE"}, 
null),
+  new GenericRowWithSchema(new Object[] {4, "d", "data", "INSERT"}, 
null));
+
+  private final int changeTypeIndex = 3;
+  private final List partitionIdx = Lists.newArrayList(0, 1);
+
+  @Test
+  public void testUpdatedRows() {

Review Comment:
   Made the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-15 Thread GitBox


flyrain commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050273608


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};

Review Comment:
   A static method is used for concatenating two iterators. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] flyrain commented on a diff in pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-15 Thread GitBox


flyrain commented on code in PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#discussion_r1050273739


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+
+/**
+ * An iterator that transforms rows from changelog tables within a single 
Spark task.
+ *
+ * It marks the carry-over rows to null to for filtering out later. 
Carry-over rows are unchanged
+ * rows in a snapshot but showed as delete-rows and insert-rows in a changelog 
table due to the
+ * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') 
and row2 (id=2,
+ * data='b') in a data file, if we only delete row2, the COW will copy row1 to 
a new data file and
+ * delete the whole old data file. The changelog table will have two 
delete-rows(row1 and row2), and
+ * one insert-row(row1). Row1 is a carry-over row.
+ *
+ * The iterator marks the delete-row and insert-row to be the update-rows. 
For example, these two
+ * rows
+ *
+ * 
+ *   (id=1, data='a', op='DELETE')
+ *   (id=1, data='b', op='INSERT')
+ * 
+ *
+ * will be marked as update-rows:
+ *
+ * 
+ *   (id=1, data='a', op='UPDATE_BEFORE')
+ *   (id=1, data='b', op='UPDATE_AFTER')
+ * 
+ */
+public class ChangelogIterator implements Iterator, Serializable {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  private final Iterator rowIterator;
+  private final int changeTypeIndex;
+  private final List partitionIdx;
+
+  private Row cachedRow = null;
+
+  public ChangelogIterator(
+  Iterator rowIterator, int changeTypeIndex, List 
partitionIdx) {
+this.rowIterator = rowIterator;
+this.changeTypeIndex = changeTypeIndex;
+this.partitionIdx = partitionIdx;
+  }
+
+  @Override
+  public boolean hasNext() {
+if (cachedRow != null) {
+  return true;
+}
+return rowIterator.hasNext();
+  }
+
+  @Override
+  public Row next() {
+// if there is an updated cached row, return it directly
+if (updated(cachedRow)) {
+  Row row = cachedRow;
+  cachedRow = null;
+  return row;
+}
+
+Row currentRow = currentRow();
+
+if (rowIterator.hasNext()) {
+  GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();
+  cachedRow = nextRow;
+
+  if (updateOrCarryoverRecord(currentRow, nextRow)) {
+Row[] rows = update((GenericRowWithSchema) currentRow, nextRow);
+
+currentRow = rows[0];
+cachedRow = rows[1];
+  }
+}
+
+return currentRow;
+  }
+
+  private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema 
nextRow) {
+GenericInternalRow deletedRow = new 
GenericInternalRow(currentRow.values());
+GenericInternalRow insertedRow = new GenericInternalRow(nextRow.values());
+
+if (isCarryoverRecord(deletedRow, insertedRow)) {
+  // set carry-over rows to null for filtering out later
+  return new Row[] {null, null};
+} else {
+  deletedRow.update(changeTypeIndex, UPDATE_BEFORE);
+  insertedRow.update(changeTypeIndex, UPDATE_AFTER);
+
+  return new Row[] {
+RowFactory.create(deletedRow.values()), 
RowFactory.create(insertedRow.values())
+  };
+}
+  }
+
+  private boolean isCarryoverRecord(GenericInternalRow deletedRow, 
GenericInternalRow insertedRow) {
+// set the change_type to the same value
+deletedRow.update(changeTypeIndex, "");
+insertedRow.update(changeTypeIndex, "");
+return deletedRow.equals(insertedRow);
+  }
+
+  private boolean updated(Row row) {

[GitHub] [iceberg] flyrain commented on pull request #6344: Spark 3.3: Introduce the changelog iterator

2022-12-15 Thread GitBox


flyrain commented on PR #6344:
URL: https://github.com/apache/iceberg/pull/6344#issuecomment-1354023746

   Thanks @szehon-ho and @RussellSpitzer  for the review. Resolved all 
comments. Ready for the another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

2022-12-15 Thread GitBox


rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050275269


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java:
##
@@ -246,18 +248,70 @@ private static Optional 
convertFieldAndLiteral(
 org.apache.flink.table.expressions.Expression left = args.get(0);
 org.apache.flink.table.expressions.Expression right = args.get(1);
 
-if (left instanceof FieldReferenceExpression && right instanceof 
ValueLiteralExpression) {
-  String name = ((FieldReferenceExpression) left).getName();
-  Optional lit = convertLiteral((ValueLiteralExpression) right);
+Optional lit;
+if (left instanceof FieldReferenceExpression) {
+  lit = convertExpression(right);
   if (lit.isPresent()) {
-return Optional.of(convertLR.apply(name, lit.get()));
+return Optional.of(convertLR.apply(((FieldReferenceExpression) 
left).getName(), lit.get()));

Review Comment:
   I don't see a reason to remove the `name` variable in this block or the 
mirror one. That introduces needless changes. Can you roll that change back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

2022-12-15 Thread GitBox


rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277049


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java:
##
@@ -246,18 +248,70 @@ private static Optional 
convertFieldAndLiteral(
 org.apache.flink.table.expressions.Expression left = args.get(0);
 org.apache.flink.table.expressions.Expression right = args.get(1);
 
-if (left instanceof FieldReferenceExpression && right instanceof 
ValueLiteralExpression) {
-  String name = ((FieldReferenceExpression) left).getName();
-  Optional lit = convertLiteral((ValueLiteralExpression) right);
+Optional lit;
+if (left instanceof FieldReferenceExpression) {
+  lit = convertExpression(right);
   if (lit.isPresent()) {
-return Optional.of(convertLR.apply(name, lit.get()));
+return Optional.of(convertLR.apply(((FieldReferenceExpression) 
left).getName(), lit.get()));
   }
-} else if (left instanceof ValueLiteralExpression
-&& right instanceof FieldReferenceExpression) {
-  Optional lit = convertLiteral((ValueLiteralExpression) left);
-  String name = ((FieldReferenceExpression) right).getName();
+} else if (right instanceof FieldReferenceExpression) {
+  lit = convertExpression(left);
   if (lit.isPresent()) {
-return Optional.of(convertRL.apply(name, lit.get()));
+return Optional.of(
+convertRL.apply(((FieldReferenceExpression) right).getName(), 
lit.get()));
+  }
+}
+
+return Optional.empty();
+  }
+
+  private static Optional convertExpression(
+  org.apache.flink.table.expressions.Expression expression) {
+if (expression instanceof ValueLiteralExpression) {
+  return convertLiteral((ValueLiteralExpression) expression);
+} else if (expression instanceof CallExpression) {
+  return convertCallExpression((CallExpression) expression);
+}
+return Optional.empty();
+  }
+
+  private static Optional convertCallExpression(CallExpression call) {
+if (!BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) 
{
+  return Optional.empty();
+}
+
+List args = call.getResolvedChildren();
+if (args.size() != 2) {
+  return Optional.empty();
+}
+
+org.apache.flink.table.expressions.Expression left = args.get(0);
+org.apache.flink.table.expressions.Expression right = args.get(1);
+
+if (left instanceof ValueLiteralExpression && right instanceof 
TypeLiteralExpression) {
+  ValueLiteralExpression value = (ValueLiteralExpression) left;
+  TypeLiteralExpression type = (TypeLiteralExpression) right;
+
+  LogicalType logicalType = type.getOutputDataType().getLogicalType();
+
+  Optional result = 
value.getValueAs(logicalType.getDefaultConversion());
+  if (result.isPresent()) {
+return Optional.of(result.get());
+  }
+
+  switch (logicalType.getTypeRoot()) {
+case DOUBLE:
+  Optional strValue = value.getValueAs(String.class);
+  if (strValue.isPresent()) {
+return Optional.of(Double.valueOf(strValue.get()));

Review Comment:
   I think that this needs to handle `NumberFormatException` in case someone 
uses an expression like `cast("fail" as double)`. You could put a try/except 
around the switch and return `Optional.empty`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

2022-12-15 Thread GitBox


rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277901


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##
@@ -603,7 +603,108 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-// todo add some test case to test NaN
+  public void testFilterNaN() {
+final String tableName = "test_table_nan";
+try {
+  sql(
+  "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH 
('write.format.default'='%s')",
+  tableName, format.name());
+  sql(
+  "INSERT INTO %s VALUES (1,'iceberg',10, 
1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS 
DOUBLE),4.4)",
+  tableName);
+
+  String sqlNaNDoubleEqual =
+  String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", 
tableName);
+  List resultDoubleEqual = sql(sqlNaNDoubleEqual);
+  Assert.assertEquals("Should have 0 records", 0, 
resultDoubleEqual.size());
+
+  String sqlNaNDoubleNotEqual =
+  String.format("SELECT * FROM %s  WHERE d <> CAST('NaN' AS DOUBLE)  
", tableName);
+  List resultDoubleNotEqual = sql(sqlNaNDoubleNotEqual);
+  List expectedDouble =
+  Lists.newArrayList(
+  Row.of(1, "iceberg", 10.0d, 1.1f),
+  Row.of(2, "b", 20.0d, 2.2f),
+  Row.of(3, null, 30.0d, 3.3f),
+  Row.of(4, "d", Double.NaN, 4.4f));

Review Comment:
   Why is this matching? The field is NaN.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

2022-12-15 Thread GitBox


rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050278409


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##
@@ -603,7 +603,108 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-// todo add some test case to test NaN
+  public void testFilterNaN() {
+final String tableName = "test_table_nan";
+try {
+  sql(
+  "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH 
('write.format.default'='%s')",
+  tableName, format.name());
+  sql(
+  "INSERT INTO %s VALUES (1,'iceberg',10, 
1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS 
DOUBLE),4.4)",
+  tableName);
+
+  String sqlNaNDoubleEqual =
+  String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", 
tableName);
+  List resultDoubleEqual = sql(sqlNaNDoubleEqual);
+  Assert.assertEquals("Should have 0 records", 0, 
resultDoubleEqual.size());

Review Comment:
   This should have 1 record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

2022-12-15 Thread GitBox


rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050278958


##
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-// todo add some test case to test NaN
+  public void testFilterNaN() {
+sql(
+"CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH 
('write.format.default'='%s')",
+TABLE_NAME_NAN, format.name());
+sql(
+"INSERT INTO %s VALUES (1,'iceberg',10, 
1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS 
DOUBLE),4.4)",
+TABLE_NAME_NAN);
+
+String sqlNaNDoubleEqual =
+String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", 
TABLE_NAME_NAN);
+List resultDoubleEqual = sql(sqlNaNDoubleEqual);
+Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());

Review Comment:
   See my note below on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on pull request #6402: Flink: Add UT for NaN

2022-12-15 Thread GitBox


rdblue commented on PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#issuecomment-1354038557

   @hililiwei, I flagged the test cases in my review, but I now see that 
@stevenzwu did as well.
   
   The problem is that NaN comparison should always result in `false`. That's 
why Iceberg doesn't allow `NaN` as a literal value in expressions. If you try 
to create an expression with a [`NaN` literal, it will 
fail](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/expressions/Literals.java#L61).
   
   That forces callers to be specific. If the caller intends to compare with 
`NaN`, then the comparison should be `false` and there's no need to pass it to 
Iceberg. If the caller intends to check whether a value is `NaN`, then there is 
the [`isNaN` unary 
predicate](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/expressions/Expressions.java#LL125C28-L125C28).
   
   It is up to Flink how to map expressions. In Spark, `NaN` is a valid 
comparison, so Spark filter translation converts `x = NaN` to `isNaN(x)`. Flink 
could do something similar.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue merged pull request #6439: Python: Add pyparsing

2022-12-15 Thread GitBox


rdblue merged PR #6439:
URL: https://github.com/apache/iceberg/pull/6439


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on pull request #6439: Python: Add pyparsing

2022-12-15 Thread GitBox


rdblue commented on PR #6439:
URL: https://github.com/apache/iceberg/pull/6439#issuecomment-1354039711

   I think this was my mistake. I thought it was part of the standard library 
since I didn't need to install it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


rdblue commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1354040055

   @cccs-eric, I just merged $6439 that fixes the pyparsing problem (my fault) 
so you should be able to rebase and get tests working. Sorry about that!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6074: API,Core: SnapshotManager to be created through Transaction

2022-12-15 Thread GitBox


rdblue commented on code in PR #6074:
URL: https://github.com/apache/iceberg/pull/6074#discussion_r1050282810


##
core/src/main/java/org/apache/iceberg/SnapshotManager.java:
##
@@ -30,6 +31,17 @@ public class SnapshotManager implements ManageSnapshots {
 ops.current() != null, "Cannot manage snapshots: table %s does not 
exist", tableName);
 this.transaction =
 new BaseTransaction(tableName, ops, 
BaseTransaction.TransactionType.SIMPLE, ops.refresh());
+this.isExternalTransaction = false;
+  }
+
+  SnapshotManager(BaseTransaction transaction) {
+Preconditions.checkNotNull(transaction, "Input transaction cannot be 
null");
+Preconditions.checkNotNull(

Review Comment:
   @gaborkaszab, this is what I'm talking about with the create transaction 
comment. Why is this precondition needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050284139


##
python/pyiceberg/expressions/visitors.py:
##
@@ -753,3 +757,68 @@ def inclusive_projection(
 schema: Schema, spec: PartitionSpec, case_sensitive: bool = True
 ) -> Callable[[BooleanExpression], BooleanExpression]:
 return InclusiveProjection(schema, spec, case_sensitive).project
+
+
+class _ExpressionProjector(BooleanExpressionVisitor[BooleanExpression]):
+"""Rewrites a boolean expression by replacing unbound references with 
references to fields in a struct schema
+
+Args:
+  table_schema (Schema): The schema of the table
+  file_schema (Schema): The schema of the file
+  case_sensitive (bool): Whether to consider case when binding a reference 
to a field in a schema, defaults to True
+
+Raises:
+TypeError: In the case a predicate is already bound
+"""
+
+table_schema: Schema
+file_schema: Schema
+case_sensitive: bool
+
+def __init__(self, table_schema: Schema, file_schema: Schema, 
case_sensitive: bool) -> None:
+self.table_schema = table_schema
+self.file_schema = file_schema
+self.case_sensitive = case_sensitive
+
+def visit_true(self) -> BooleanExpression:
+return AlwaysTrue()
+
+def visit_false(self) -> BooleanExpression:
+return AlwaysFalse()
+
+def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+return Not(child=child_result)
+
+def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+return And(left=left_result, right=right_result)
+
+def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+return Or(left=left_result, right=right_result)
+
+def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> 
BooleanExpression:
+if not isinstance(predicate.term, Reference):
+raise ValueError(f"Exprected reference: {predicate.term}")
+
+field = self.table_schema.find_field(predicate.term.name, 
case_sensitive=self.case_sensitive)
+file_column_name = self.file_schema.find_column_name(field.field_id)
+
+if not file_column_name:
+raise ValueError(f"Not found in schema: {file_column_name}")
+
+if isinstance(predicate, UnaryPredicate):
+return predicate.__class__(Reference(file_column_name))

Review Comment:
   Shouldn't this be `predicate.as_unbound(Reference(file_column_name))`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050284725


##
python/pyiceberg/expressions/visitors.py:
##
@@ -753,3 +757,68 @@ def inclusive_projection(
 schema: Schema, spec: PartitionSpec, case_sensitive: bool = True
 ) -> Callable[[BooleanExpression], BooleanExpression]:
 return InclusiveProjection(schema, spec, case_sensitive).project
+
+
+class _ExpressionProjector(BooleanExpressionVisitor[BooleanExpression]):
+"""Rewrites a boolean expression by replacing unbound references with 
references to fields in a struct schema
+
+Args:
+  table_schema (Schema): The schema of the table
+  file_schema (Schema): The schema of the file
+  case_sensitive (bool): Whether to consider case when binding a reference 
to a field in a schema, defaults to True
+
+Raises:
+TypeError: In the case a predicate is already bound
+"""
+
+table_schema: Schema
+file_schema: Schema
+case_sensitive: bool
+
+def __init__(self, table_schema: Schema, file_schema: Schema, 
case_sensitive: bool) -> None:
+self.table_schema = table_schema
+self.file_schema = file_schema
+self.case_sensitive = case_sensitive
+
+def visit_true(self) -> BooleanExpression:
+return AlwaysTrue()
+
+def visit_false(self) -> BooleanExpression:
+return AlwaysFalse()
+
+def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
+return Not(child=child_result)
+
+def visit_and(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+return And(left=left_result, right=right_result)
+
+def visit_or(self, left_result: BooleanExpression, right_result: 
BooleanExpression) -> BooleanExpression:
+return Or(left=left_result, right=right_result)
+
+def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> 
BooleanExpression:
+if not isinstance(predicate.term, Reference):
+raise ValueError(f"Exprected reference: {predicate.term}")
+
+field = self.table_schema.find_field(predicate.term.name, 
case_sensitive=self.case_sensitive)
+file_column_name = self.file_schema.find_column_name(field.field_id)
+
+if not file_column_name:
+raise ValueError(f"Not found in schema: {file_column_name}")
+
+if isinstance(predicate, UnaryPredicate):
+return predicate.__class__(Reference(file_column_name))

Review Comment:
   Oh, nevermind. I see that the input expression is unbound. I'm not sure 
whether it would be better to do this to a bound expression or not. That seems 
like good separation of responsibilities and a good way to have consistent 
error messages when an expression isn't valid for a particular schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050285350


##
python/pyiceberg/expressions/visitors.py:
##
@@ -753,3 +757,68 @@ def inclusive_projection(
 schema: Schema, spec: PartitionSpec, case_sensitive: bool = True
 ) -> Callable[[BooleanExpression], BooleanExpression]:
 return InclusiveProjection(schema, spec, case_sensitive).project
+
+
+class _ExpressionProjector(BooleanExpressionVisitor[BooleanExpression]):

Review Comment:
   We already use "project" to describe producing an expression for transformed 
values, so I think it would be confusing to name this "projector". What about 
something like "name translator"? I think mentioning that it rewrites 
expression names is a good thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

2022-12-15 Thread GitBox


yegangy0718 commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1050285783


##
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Shuffle operator can help to improve data clustering based on the key.
+ *
+ * It collects the data statistics information, sends to coordinator and 
gets the global data
+ * distribution weight from coordinator. Then it will ingest the weight into 
data stream(wrap by a
+ * class{@link ShuffleRecordWrapper}) and send to partitioner.
+ */
+@Internal
+public class ShuffleOperator
+extends AbstractStreamOperator>
+implements OneInputStreamOperator>, 
OperatorEventHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  private final KeySelector keySelector;
+  // the type of the key to collect data statistics
+  private final TypeInformation keyType;
+  private final OperatorEventGateway operatorEventGateway;
+  // key is generated by applying KeySelector to record
+  // value is the times key occurs
+  private transient Map localDataStatisticsMap;
+  private transient Map globalDataDistributionWeightMap;
+  private transient ListState> globalDataDistributionWeightState;
+
+  public ShuffleOperator(
+  KeySelector keySelector,
+  TypeInformation keyType,
+  OperatorEventGateway operatorEventGateway) {
+this.keySelector = keySelector;
+this.keyType = keyType;
+this.operatorEventGateway = operatorEventGateway;
+  }
+
+  @VisibleForTesting
+  ListStateDescriptor> 
generateGlobalDataDistributionWeightDescriptor() {
+return new ListStateDescriptor<>(
+"globalDataDistributionWeight", new MapTypeInfo<>(keyType, 
TypeInformation.of(Long.class)));
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+localDataStatisticsMap = Maps.newHashMap();
+globalDataDistributionWeightState =

Review Comment:
   SGTM. I can create a followup PR to add scaling up test case to check the 
behavior for BroadcastState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050286173


##
python/pyiceberg/io/pyarrow.py:
##
@@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, 
right_result: pc.Expression) -> p
 
 def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
 return boolean_expression_visit(expr, _ConvertToArrowExpression())
+
+
+class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]):
+file_schema: Schema
+table: pa.Table
+
+def __init__(self, file_schema: Schema, table: pa.Table):
+self.file_schema = file_schema
+self.table = table
+
+def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> 
pa.Table:
+return pa.table(struct_result, schema=schema_to_pyarrow(schema))
+
+def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> 
List[pa.ChunkedArray]:
+return field_results
+
+def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray:
+column_name = self.file_schema.find_column_name(field.field_id)
+
+if column_name:
+column_idx = self.table.schema.get_field_index(column_name)
+else:
+column_idx = -1
+
+expected_arrow_type = schema_to_pyarrow(field.field_type)
+
+# The idx will be -1 when the column can't be found
+if column_idx >= 0:
+column_field: pa.Field = self.table.schema[column_idx]
+column_arrow_type: pa.DataType = column_field.type
+column_data: pa.ChunkedArray = self.table[column_idx]
+
+# In case of schema evolution
+if column_arrow_type != expected_arrow_type:
+column_data = column_data.cast(expected_arrow_type)
+else:
+import numpy as np
+
+column_data = pa.array(np.full(shape=len(self.table), 
fill_value=None), type=expected_arrow_type)
+return column_data
+
+def list(self, _: ListType, element_result: pa.ChunkedArray) -> 
pa.ChunkedArray:
+pass
+
+def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: 
pa.ChunkedArray) -> pa.DataType:
+pass
+
+def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray:
+pass
+
+
+def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> 
pa.Table:
+return visit(final_schema, _ConstructFinalSchema(schema, table))
+
+
+def project_table(
+files: Iterable["FileScanTask"], table: "Table", row_filter: 
BooleanExpression, projected_schema: Schema, case_sensitive: bool
+) -> pa.Table:
+if isinstance(table.io, PyArrowFileIO):
+scheme, path = PyArrowFileIO.parse_location(table.location())
+fs = table.io.get_fs(scheme)
+else:
+raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
+
+projected_field_ids = projected_schema.field_ids
+
+tables = []
+for task in files:
+_, path = PyArrowFileIO.parse_location(task.file.file_path)
+
+# Get the schema
+with fs.open_input_file(path) as fout:
+parquet_schema = pq.read_schema(fout)
+schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA)
+file_schema = Schema.parse_raw(schema_raw)
+
+file_project_schema = prune_columns(file_schema, projected_field_ids)
+
+pyarrow_filter = None
+if row_filter is not AlwaysTrue():
+row_filter = project_expression(row_filter, table.schema(), 
file_schema, case_sensitive=case_sensitive)
+bound_row_filter = bind(file_schema, row_filter, 
case_sensitive=case_sensitive)
+pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+if file_schema is None:
+raise ValueError(f"Iceberg schema not encoded in Parquet file: 
{path}")

Review Comment:
   Missing Iceberg schema for file: {path}? That avoids negation and needing to 
understand why it would be "encoded" in a file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050287321


##
python/pyiceberg/io/pyarrow.py:
##
@@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, 
right_result: pc.Expression) -> p
 
 def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
 return boolean_expression_visit(expr, _ConvertToArrowExpression())
+
+
+class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]):
+file_schema: Schema
+table: pa.Table
+
+def __init__(self, file_schema: Schema, table: pa.Table):
+self.file_schema = file_schema
+self.table = table
+
+def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> 
pa.Table:
+return pa.table(struct_result, schema=schema_to_pyarrow(schema))
+
+def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> 
List[pa.ChunkedArray]:
+return field_results
+
+def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray:
+column_name = self.file_schema.find_column_name(field.field_id)
+
+if column_name:
+column_idx = self.table.schema.get_field_index(column_name)
+else:
+column_idx = -1
+
+expected_arrow_type = schema_to_pyarrow(field.field_type)
+
+# The idx will be -1 when the column can't be found
+if column_idx >= 0:
+column_field: pa.Field = self.table.schema[column_idx]
+column_arrow_type: pa.DataType = column_field.type
+column_data: pa.ChunkedArray = self.table[column_idx]
+
+# In case of schema evolution
+if column_arrow_type != expected_arrow_type:
+column_data = column_data.cast(expected_arrow_type)
+else:
+import numpy as np
+
+column_data = pa.array(np.full(shape=len(self.table), 
fill_value=None), type=expected_arrow_type)
+return column_data
+
+def list(self, _: ListType, element_result: pa.ChunkedArray) -> 
pa.ChunkedArray:
+pass
+
+def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: 
pa.ChunkedArray) -> pa.DataType:
+pass
+
+def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray:
+pass
+
+
+def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> 
pa.Table:
+return visit(final_schema, _ConstructFinalSchema(schema, table))
+
+
+def project_table(
+files: Iterable["FileScanTask"], table: "Table", row_filter: 
BooleanExpression, projected_schema: Schema, case_sensitive: bool
+) -> pa.Table:
+if isinstance(table.io, PyArrowFileIO):
+scheme, path = PyArrowFileIO.parse_location(table.location())
+fs = table.io.get_fs(scheme)
+else:
+raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
+
+projected_field_ids = projected_schema.field_ids
+
+tables = []
+for task in files:
+_, path = PyArrowFileIO.parse_location(task.file.file_path)
+
+# Get the schema
+with fs.open_input_file(path) as fout:
+parquet_schema = pq.read_schema(fout)
+schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA)
+file_schema = Schema.parse_raw(schema_raw)
+
+file_project_schema = prune_columns(file_schema, projected_field_ids)
+
+pyarrow_filter = None
+if row_filter is not AlwaysTrue():
+row_filter = project_expression(row_filter, table.schema(), 
file_schema, case_sensitive=case_sensitive)
+bound_row_filter = bind(file_schema, row_filter, 
case_sensitive=case_sensitive)
+pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+if file_schema is None:
+raise ValueError(f"Iceberg schema not encoded in Parquet file: 
{path}")
+
+# Prune the stuff that we don't need anyway
+file_project_schema_arrow = schema_to_pyarrow(file_project_schema)
+
+arrow_table = ds.dataset(

Review Comment:
   What happens when one file has a column but others don't? For example, if I 
run `ALTER TABLE t ADD COLUMN c int` and read files before and after the 
change? Also, what happens if we end up with mixed types as well because c was 
promoted from `int` to `long`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050287724


##
python/pyiceberg/io/pyarrow.py:
##
@@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, 
right_result: pc.Expression) -> p
 
 def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
 return boolean_expression_visit(expr, _ConvertToArrowExpression())
+
+
+class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]):
+file_schema: Schema
+table: pa.Table
+
+def __init__(self, file_schema: Schema, table: pa.Table):
+self.file_schema = file_schema
+self.table = table
+
+def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> 
pa.Table:
+return pa.table(struct_result, schema=schema_to_pyarrow(schema))
+
+def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> 
List[pa.ChunkedArray]:
+return field_results
+
+def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray:
+column_name = self.file_schema.find_column_name(field.field_id)
+
+if column_name:
+column_idx = self.table.schema.get_field_index(column_name)
+else:
+column_idx = -1
+
+expected_arrow_type = schema_to_pyarrow(field.field_type)
+
+# The idx will be -1 when the column can't be found
+if column_idx >= 0:
+column_field: pa.Field = self.table.schema[column_idx]
+column_arrow_type: pa.DataType = column_field.type
+column_data: pa.ChunkedArray = self.table[column_idx]
+
+# In case of schema evolution
+if column_arrow_type != expected_arrow_type:
+column_data = column_data.cast(expected_arrow_type)
+else:
+import numpy as np
+
+column_data = pa.array(np.full(shape=len(self.table), 
fill_value=None), type=expected_arrow_type)
+return column_data
+
+def list(self, _: ListType, element_result: pa.ChunkedArray) -> 
pa.ChunkedArray:
+pass
+
+def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: 
pa.ChunkedArray) -> pa.DataType:
+pass
+
+def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray:
+pass
+
+
+def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> 
pa.Table:
+return visit(final_schema, _ConstructFinalSchema(schema, table))
+
+
+def project_table(
+files: Iterable["FileScanTask"], table: "Table", row_filter: 
BooleanExpression, projected_schema: Schema, case_sensitive: bool
+) -> pa.Table:
+if isinstance(table.io, PyArrowFileIO):
+scheme, path = PyArrowFileIO.parse_location(table.location())
+fs = table.io.get_fs(scheme)
+else:
+raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
+
+projected_field_ids = projected_schema.field_ids
+
+tables = []
+for task in files:

Review Comment:
   I think the inner part of this loop should be a Parquet method that we 
provide, so that the caller can read progressively or read parts in parallel 
tasks. This is a great start for single process, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue commented on a diff in pull request #6437: Python: Projection by Field ID

2022-12-15 Thread GitBox


rdblue commented on code in PR #6437:
URL: https://github.com/apache/iceberg/pull/6437#discussion_r1050288238


##
python/pyiceberg/io/pyarrow.py:
##
@@ -437,3 +457,103 @@ def visit_or(self, left_result: pc.Expression, 
right_result: pc.Expression) -> p
 
 def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
 return boolean_expression_visit(expr, _ConvertToArrowExpression())
+
+
+class _ConstructFinalSchema(SchemaVisitor[pa.ChunkedArray]):
+file_schema: Schema
+table: pa.Table
+
+def __init__(self, file_schema: Schema, table: pa.Table):
+self.file_schema = file_schema
+self.table = table
+
+def schema(self, schema: Schema, struct_result: List[pa.ChunkedArray]) -> 
pa.Table:
+return pa.table(struct_result, schema=schema_to_pyarrow(schema))
+
+def struct(self, _: StructType, field_results: List[pa.ChunkedArray]) -> 
List[pa.ChunkedArray]:
+return field_results
+
+def field(self, field: NestedField, _: pa.ChunkedArray) -> pa.ChunkedArray:
+column_name = self.file_schema.find_column_name(field.field_id)
+
+if column_name:
+column_idx = self.table.schema.get_field_index(column_name)
+else:
+column_idx = -1
+
+expected_arrow_type = schema_to_pyarrow(field.field_type)
+
+# The idx will be -1 when the column can't be found
+if column_idx >= 0:
+column_field: pa.Field = self.table.schema[column_idx]
+column_arrow_type: pa.DataType = column_field.type
+column_data: pa.ChunkedArray = self.table[column_idx]
+
+# In case of schema evolution
+if column_arrow_type != expected_arrow_type:
+column_data = column_data.cast(expected_arrow_type)
+else:
+import numpy as np
+
+column_data = pa.array(np.full(shape=len(self.table), 
fill_value=None), type=expected_arrow_type)
+return column_data
+
+def list(self, _: ListType, element_result: pa.ChunkedArray) -> 
pa.ChunkedArray:
+pass
+
+def map(self, _: MapType, key_result: pa.ChunkedArray, value_result: 
pa.ChunkedArray) -> pa.DataType:
+pass
+
+def primitive(self, primitive: PrimitiveType) -> pa.ChunkedArray:
+pass
+
+
+def to_final_schema(final_schema: Schema, schema: Schema, table: pa.Table) -> 
pa.Table:
+return visit(final_schema, _ConstructFinalSchema(schema, table))
+
+
+def project_table(
+files: Iterable["FileScanTask"], table: "Table", row_filter: 
BooleanExpression, projected_schema: Schema, case_sensitive: bool
+) -> pa.Table:
+if isinstance(table.io, PyArrowFileIO):
+scheme, path = PyArrowFileIO.parse_location(table.location())
+fs = table.io.get_fs(scheme)
+else:
+raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
+
+projected_field_ids = projected_schema.field_ids
+
+tables = []
+for task in files:
+_, path = PyArrowFileIO.parse_location(task.file.file_path)
+
+# Get the schema
+with fs.open_input_file(path) as fout:
+parquet_schema = pq.read_schema(fout)
+schema_raw = parquet_schema.metadata.get(ICEBERG_SCHEMA)
+file_schema = Schema.parse_raw(schema_raw)
+
+file_project_schema = prune_columns(file_schema, projected_field_ids)

Review Comment:
   This is going to produce a subset of the requested schema. If the file has 
columns a and b, but the requested schema has a, b, and c, then this is going 
to only have the ones from the file and will produce a dataset with a missing 
column. That's okay if Arrow knows how to handle it below in `concat_tables`, I 
think.
   
   We just need to be careful that we are producing an Arrow table that matches 
the requested schema, not just the file schemas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] cccs-eric commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


cccs-eric commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1354058385

   > @cccs-eric, I just merged $6439 that fixes the pyparsing problem (my 
fault) so you should be able to rebase and get tests working. Sorry about that!
   
   Thanks @rdblue , build is now passing 🥳 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] cccs-eric commented on pull request #6392: Python: Add adlfs support (Azure DataLake FileSystem)

2022-12-15 Thread GitBox


cccs-eric commented on PR #6392:
URL: https://github.com/apache/iceberg/pull/6392#issuecomment-1354060350

   @Fokko One last thing that I haven't done is modifying verify-release.md for 
the integration tests.  Should I add `test-adlfs` in there?
   
https://github.com/apache/iceberg/blob/master/python/mkdocs/docs/verify-release.md?plain=1#L86-L90


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xwmr-max opened a new pull request, #6440: Flink: Support Look-up Function

2022-12-15 Thread GitBox


xwmr-max opened a new pull request, #6440:
URL: https://github.com/apache/iceberg/pull/6440

   Currently, ice does not support look-up join. This PR provides the look-up 
join function to meet the requirements of basic join scenarios.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org