[GitHub] [iceberg] Shane-Yu opened a new issue, #5671: The upsert mode can query the historical version of the data under certain conditions

2022-08-30 Thread GitBox


Shane-Yu opened a new issue, #5671:
URL: https://github.com/apache/iceberg/issues/5671

   ### Apache Iceberg version
   
   0.13.1
   
   ### Query engine
   
   Hive
   
   ### Please describe the bug šŸž
   
  In Iceberg upsert mode, create v2 table like this:
   
   > create table upsert_update_time_test(
   > id bigint comment 'pk',
   > data bigint comment 'data',
   > update_time string comment 'update_time'
   > )
   >  comment 'upsert_update_time_test'
   > STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
   > TBLPROPERTIES (
   > 'engine.hive.enabled'='true',
   > 'write.metadata.delete-after-commit.enabled'='true',
   > 'write.target-file-size-bytes'='134217728',
   > 'write.metadata.previous-versions-max'='5',
   > 'write.metadata.metrics.default'='full',
   > 'format-version'='2'
   >  );
   
   
 Write data to iceberg with Flink like the code below:
   
   > FlinkSink.forRow(rowDataStream, tableSchema)
   > .tableLoader(tableLoader)
   > .tableSchema(tableSchema)
   > .upsert(true)
   > .writeParallelism(1)
   > .equalityFieldColumns(ImmutableList.of("id"))
   > .append();
   
   And send  data to like this:
   > $ nc -lk 3287
   > I,1,101,2022-08-26 15:44:50
   > U,1,103,2022-08-26 15:45:23
   
![image](https://user-images.githubusercontent.com/26053387/187426590-57fc1d33-e07e-4892-b148-96253eebf786.png)
   
   
Finally, using hive and spark both got the following query results:
   
   >select * from upsert_update_time_test;
   > OK
   > upsert_update_time_test.id upsert_update_time_test.data
upsert_update_time_test.update_time
   > 1  103 2022-08-26 15:45:23
   > Time taken: 0.107 seconds, Fetched: 1 row(s)
   > hive (iceberg_yx)> select * from upsert_update_time_test where update_time 
<= '2022-08-26 15:45:00';
   > OK
   > upsert_update_time_test.id upsert_update_time_test.data
upsert_update_time_test.update_time
   > 1  101 2022-08-26 15:44:50
   > Time taken: 0.76 seconds, Fetched: 1 row(s)
   > hive (iceberg_yx)> select * from upsert_update_time_test where update_time 
<= '2022-08-26 15:46:00';
   > OK
   > upsert_update_time_test.id upsert_update_time_test.data
upsert_update_time_test.update_time
   > 1  103 2022-08-26 15:45:23
   > Time taken: 1.26 seconds, Fetched: 1 row(s)
   > hive (iceberg_yx)>
   >  > select * from upsert_update_time_test where data <= 102;
   > OK
   > upsert_update_time_test.id upsert_update_time_test.data
upsert_update_time_test.update_time
   > 1  101 2022-08-26 15:44:50
   > Time taken: 0.119 seconds, Fetched: 1 row(s)
   > hive (iceberg_yx)>
   >  > select * from upsert_update_time_test where data <= 103;
   > OK
   > upsert_update_time_test.id upsert_update_time_test.data
upsert_update_time_test.update_time
   > 1  103 2022-08-26 15:45:23
   > Time taken: 0.114 seconds, Fetched: 1 row(s)
   > hive (iceberg_yx)>
   >  > select * from upsert_update_time_test where id = 1;
   > OK
   > upsert_update_time_test.id upsert_update_time_test.data
upsert_update_time_test.update_time
   > 1  103 2022-08-26 15:45:23
   > Time taken: 0.134 seconds, Fetched: 1 row(s)
   
   
![image](https://user-images.githubusercontent.com/26053387/187428640-94f5e24d-6381-4a27-acc4-b4341d7d5242.png)
   
   
![image](https://user-images.githubusercontent.com/26053387/187427949-a83c029d-66b2-4c45-9866-d96abbe07033.png)
   
   
The above query results show that the v2 table can **_query the historical 
version of the data when it meets the historical data conditions_**. Is this a 
bug or is there something wrong with my operation?   Anybody else met this?

   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Shane-Yu commented on issue #5671: The upsert mode can query the historical version of the data under certain conditions

2022-08-30 Thread GitBox


Shane-Yu commented on issue #5671:
URL: https://github.com/apache/iceberg/issues/5671#issuecomment-1231565348

   @rdblue @openinx @stevenzwu @kbendick   Can you guys take some time to look 
at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new pull request, #5672: Python: Update docs and fine-tune the API

2022-08-30 Thread GitBox


Fokko opened a new pull request, #5672:
URL: https://github.com/apache/iceberg/pull/5672

   The API wasn't consistent everywhere. Now the ids will just initialize at 1, 
so the user doesn't have to do this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Shane-Yu closed issue #5671: The upsert mode can query the historical version of the data under certain conditions

2022-08-30 Thread GitBox


Shane-Yu closed issue #5671:  The upsert mode can query the historical version 
of the data under certain conditions
URL: https://github.com/apache/iceberg/issues/5671


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rajan-v opened a new issue, #5673: Support for new offset based deletion interface deleteOffsetFromDataFile in DeleteFiles

2022-08-30 Thread GitBox


rajan-v opened a new issue, #5673:
URL: https://github.com/apache/iceberg/issues/5673

   ### Feature Request / Improvement
   
   Support for another interface in DeleteFiles 
   _DeleteFiles  deleteOffsetFromDataFile(Map 
dataFileAndOffsetFileMap)_
   https://iceberg.apache.org/javadoc/master/org/apache/iceberg/DeleteFiles.html
   
   **Context**:
   If we process datafile directly and derive that for a datafile we need to 
delete records at certain offsets, then we need some interface from Iceberg to 
pass that {datafile, offset} information. Till now without upsert features in 
catalog like hive table, lot of legacy application are scanning files and 
deriving business related details along with doing some updates by traditional 
way of rewriting complete hdfs files. All such computation can be reused if we 
have this API supported from Iceberg.
   
   
   **Option-1 (Eventual Deletes)**
   This can be thought as Eventual Deletes as this delete flow can just update 
the delete data files struct and can skip updating the manifest and stats 
around it. 
   Any new snapshot commit can compute correct set of deletion and fix the 
manifest stats w.r.t deletion.
   
   _Option-2(Actual Deletes)_
   Go through the standard flow of deletion and apply offset deletion.
   
   
   Did some experiment in executing deletes on Iceberg tables and then 
replacing, removing the immutable delete_files generated in the respective 
directories. Was able to validate the application of those delete filters while 
querying. 
   
   
   ### Query engine
   
   Spark


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] samredai commented on pull request #4801: Add Configuration page

2022-08-30 Thread GitBox


samredai commented on PR #4801:
URL: https://github.com/apache/iceberg/pull/4801#issuecomment-1231689951

   @rdblue when you have a chance can you take another look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] huaxingao commented on a diff in pull request #5638: Bind overwrite filters

2022-08-30 Thread GitBox


huaxingao commented on code in PR #5638:
URL: https://github.com/apache/iceberg/pull/5638#discussion_r958586790


##
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java:
##
@@ -47,14 +55,39 @@ public void removeTable() {
 
   @Test
   public void testTableEquality() throws NoSuchTableException {
-CatalogManager catalogManager = spark.sessionState().catalogManager();
-TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName);
-Identifier identifier = Identifier.of(tableIdent.namespace().levels(), 
tableIdent.name());
-SparkTable table1 = (SparkTable) catalog.loadTable(identifier);
-SparkTable table2 = (SparkTable) catalog.loadTable(identifier);
-
+SparkTable table1 = loadTable();
+SparkTable table2 = loadTable();
 // different instances pointing to the same table must be equivalent
 Assert.assertNotSame("References must be different", table1, table2);
 Assert.assertEquals("Tables must be equivalent", table1, table2);
   }
+
+  @Test
+  public void testOverwriteFilterConversions() throws NoSuchTableException {

Review Comment:
   Thanks a lot for taking a look at this PR!
   
   I looked at the real-world usage (`INSERT OVERWRITE` or 
`DataFrameWriterV2.overwrite`) and realized that actually Spark will throw 
`AnalysisException` if the overwrite filters are on invalid columns. So there 
is no need to bind the filters. I will close this PR.
   
   The reason I did this PR is because I was trying to address this 
[comment](https://github.com/apache/iceberg/pull/5302#discussion_r950580132). 
Now since there is no need to bind the filters in 
`SparkFilters.convert(Filter[] filters)`, I will add back the 
`SparkV2Filters.convert(Predicate[] predicates)`.
   
   I am also wondering if this 
[bind](https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java#L122)
 is needed. If the filter expression is on invalid columns, Spark throws 
`AnalysisException` before it reaches here. Shall I remove this bind?
   
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] msb1 commented on issue #5630: Failed to start the Flink task (write iceberg)

2022-08-30 Thread GitBox


msb1 commented on issue #5630:
URL: https://github.com/apache/iceberg/issues/5630#issuecomment-1231783307

   If you are using gradle and creating shadow jar; do not use minimize()... 
Was doing build with 
 shadowJar {
 minimize()
 zip64 true
 }
   in build.gradle. Had identical error message to what is shown in first post 
above.
   
   Changed to 
 shadowJar {
 zip64 true
 }
   in build.gradle and error is gone. Appears that minimize() was removing 
shaded caffeine from fat jar.  Could probably have problem if using maven 
shaded plugin as well.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] pvary commented on a diff in pull request #4518: core: Provide mechanism to cache manifest file content

2022-08-30 Thread GitBox


pvary commented on code in PR #4518:
URL: https://github.com/apache/iceberg/pull/4518#discussion_r958600742


##
core/src/main/java/org/apache/iceberg/ManifestFiles.java:
##
@@ -300,4 +328,14 @@ private static ManifestFile copyManifestInternal(
 
 return writer.toManifestFile();
   }
+
+  private static InputFile newInputFile(FileIO io, String path, long length) {
+InputFile inputFile = io.newInputFile(path, length);

Review Comment:
   I have seen cases where calling `newInputFile` was the bottleneck, since the 
HadoopFileIo created a new `FileSystem` object every time. Why did we decide to 
key the cache with the `inputFile` instead of the `path`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko opened a new pull request, #5674: Python: Install PyYaml

2022-08-30 Thread GitBox


Fokko opened a new pull request, #5674:
URL: https://github.com/apache/iceberg/pull/5674

   Currently it is missing:
   
   ```
   root@88de3a02961f:/# pip install 
"git+https://github.com/apache/iceberg.git#subdirectory=python[pyarrow]"^C
   root@88de3a02961f:/# pyiceberg
   Traceback (most recent call last):
 File "/usr/local/bin/pyiceberg", line 5, in 
   from pyiceberg.cli.console import run
 File "/usr/local/lib/python3.9/site-packages/pyiceberg/cli/console.py", 
line 30, in 
   from pyiceberg.catalog import Catalog, load_catalog
 File 
"/usr/local/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py", line 
37, in 
   from pyiceberg.utils.config import Config, merge_config
 File "/usr/local/lib/python3.9/site-packages/pyiceberg/utils/config.py", 
line 21, in 
   import yaml
   ModuleNotFoundError: No module named 'yaml'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5669: Core: Expire Snapshots reachability analysis

2022-08-30 Thread GitBox


amogh-jahagirdar commented on code in PR #5669:
URL: https://github.com/apache/iceberg/pull/5669#discussion_r958663366


##
core/src/main/java/org/apache/iceberg/RemoveSnapshots.java:
##
@@ -623,22 +667,82 @@ private Set findFilesToDelete(
 return filesToDelete;
   }
 
+  // Helper to compute files to delete
+  private Set findFilesToDelete(

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rdblue merged pull request #5021: Add API changes for statistics information in table metadata

2022-08-30 Thread GitBox


rdblue merged PR #5021:
URL: https://github.com/apache/iceberg/pull/5021


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] rizaon commented on a diff in pull request #4518: core: Provide mechanism to cache manifest file content

2022-08-30 Thread GitBox


rizaon commented on code in PR #4518:
URL: https://github.com/apache/iceberg/pull/4518#discussion_r958669856


##
core/src/main/java/org/apache/iceberg/ManifestFiles.java:
##
@@ -300,4 +328,14 @@ private static ManifestFile copyManifestInternal(
 
 return writer.toManifestFile();
   }
+
+  private static InputFile newInputFile(FileIO io, String path, long length) {
+InputFile inputFile = io.newInputFile(path, length);

Review Comment:
   `inputFile` is not used as cache key, but rather as a fallback within 
`CachingInputFile` class (field `wrappedInputFile `) if `path` does not exist 
in `ContentCache` yet.
   
   I understand that `HadoopInputFile.fromLocation()` will create a new 
`FileSystem` upon initialization. Maybe I can tweak the code to lazily 
instantiate `wrappedInputFile` by passing `io`, `path`, and `length` instead as 
constructor parameters of `CachingInputFile`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #5665: Core: Exclude old fields from the partition spec

2022-08-30 Thread GitBox


Fokko commented on code in PR #5665:
URL: https://github.com/apache/iceberg/pull/5665#discussion_r958677816


##
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java:
##
@@ -212,7 +211,7 @@ public Set capabilities() {
 
   @Override
   public MetadataColumn[] metadataColumns() {
-DataType sparkPartitionType = 
SparkSchemaUtil.convert(Partitioning.partitionType(table()));
+DataType sparkPartitionType = 
SparkSchemaUtil.convert(table().spec().partitionType());

Review Comment:
   Reading it a second time, and looking at the failing test, it makes sense to 
me:
   
   
https://github.com/apache/iceberg/blob/dbb8a404f6632a55acb36e949f0e7b84b643cede/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java#L135-L166
   
   We want to be able to jump back to earlier specs, and therefore we include 
them in the struct of the metadata column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg-docs] bitsondatadev commented on pull request #131: Adding vendors page

2022-08-30 Thread GitBox


bitsondatadev commented on PR #131:
URL: https://github.com/apache/iceberg-docs/pull/131#issuecomment-1231925458

   Checking in here @rdblue and @samredai. Is anything holding this up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko merged pull request #5674: Python: Install PyYaml

2022-08-30 Thread GitBox


Fokko merged PR #5674:
URL: https://github.com/apache/iceberg/pull/5674


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] findepi commented on pull request #4741: Add implementation for statistics information in table snapshot

2022-08-30 Thread GitBox


findepi commented on PR #4741:
URL: https://github.com/apache/iceberg/pull/4741#issuecomment-1231967878

   Rebased after #5021 has been merged to make Conflicts disappear.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] findepi commented on pull request #5021: Add API changes for statistics information in table metadata

2022-08-30 Thread GitBox


findepi commented on PR #5021:
URL: https://github.com/apache/iceberg/pull/5021#issuecomment-1231966833

   Thank you for the merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

2022-08-30 Thread GitBox


Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958814479


##
python/pyiceberg/schema.py:
##
@@ -276,6 +279,32 @@ def primitive(self, primitive: PrimitiveType) -> T:
 """Visit a PrimitiveType"""
 
 
+class PreOrderSchemaVisitor(Generic[T], ABC):

Review Comment:
   It is pre-order traversal since we start at the root and then move to the 
leaves. In order is a bit less intuitive since it is not a binary tree. You 
could also do a reverse in-order, but not sure if we need that. We can also 
call it CustomOrder if you have a strong preference, but I think pre-order is 
the most logical way of using this visitor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

2022-08-30 Thread GitBox


Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958815658


##
python/pyiceberg/schema.py:
##
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, 
value_result: int) -> int:
 
 def primitive(self, primitive: PrimitiveType) -> int:
 return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+"""Traverses the schema, and sets new IDs"""
+schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+fresh_identifier_field_ids = []
+new_schema = Schema(*schema_struct.fields)
+for field_id in schema.identifier_field_ids:
+original_field_name = schema.find_column_name(field_id)
+if original_field_name is None:
+raise ValueError(f"Could not find field: {field_id}")
+fresh_field = new_schema.find_field(original_field_name)
+if fresh_field is None:
+raise ValueError(f"Could not lookup field in new schema: 
{original_field_name}")
+fresh_identifier_field_ids.append(fresh_field.field_id)
+
+return new_schema.copy(update={"identifier_field_ids": 
fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+"""Traverses the schema and assigns monotonically increasing ids"""
+
+counter: itertools.count
+
+def __init__(self, start: int = 1) -> None:
+self.counter = itertools.count(start)
+
+def _get_and_increment(self) -> int:
+return next(self.counter)
+
+def schema(self, schema: Schema, struct_result: Callable[[], StructType]) 
-> Schema:
+return Schema(*struct_result().fields, 
identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   Yes, we do that in the function itself:
   ```python
   def assign_fresh_schema_ids(schema: Schema) -> Schema:
   """Traverses the schema, and sets new IDs"""
   schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
   
   fresh_identifier_field_ids = []
   new_schema = Schema(*schema_struct.fields)
   for field_id in schema.identifier_field_ids:
   original_field_name = schema.find_column_name(field_id)
   if original_field_name is None:
   raise ValueError(f"Could not find field: {field_id}")
   fresh_field = new_schema.find_field(original_field_name)
   if fresh_field is None:
   raise ValueError(f"Could not lookup field in new schema: 
{original_field_name}")
   fresh_identifier_field_ids.append(fresh_field.field_id)
   
   return new_schema.copy(update={"identifier_field_ids": 
fresh_identifier_field_ids})
   ```
   This is because we first want to know all the IDs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

2022-08-30 Thread GitBox


Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958853064


##
python/pyiceberg/schema.py:
##
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, 
value_result: int) -> int:
 
 def primitive(self, primitive: PrimitiveType) -> int:
 return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+"""Traverses the schema, and sets new IDs"""
+schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+fresh_identifier_field_ids = []
+new_schema = Schema(*schema_struct.fields)
+for field_id in schema.identifier_field_ids:
+original_field_name = schema.find_column_name(field_id)
+if original_field_name is None:
+raise ValueError(f"Could not find field: {field_id}")
+fresh_field = new_schema.find_field(original_field_name)
+if fresh_field is None:
+raise ValueError(f"Could not lookup field in new schema: 
{original_field_name}")
+fresh_identifier_field_ids.append(fresh_field.field_id)
+
+return new_schema.copy(update={"identifier_field_ids": 
fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+"""Traverses the schema and assigns monotonically increasing ids"""
+
+counter: itertools.count
+
+def __init__(self, start: int = 1) -> None:
+self.counter = itertools.count(start)
+
+def _get_and_increment(self) -> int:
+return next(self.counter)
+
+def schema(self, schema: Schema, struct_result: Callable[[], StructType]) 
-> Schema:
+return Schema(*struct_result().fields, 
identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   Ah, I've refactored this because we need to build a map anyway šŸ‘šŸ» 



##
python/pyiceberg/schema.py:
##
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, 
value_result: int) -> int:
 
 def primitive(self, primitive: PrimitiveType) -> int:
 return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+"""Traverses the schema, and sets new IDs"""
+schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+fresh_identifier_field_ids = []
+new_schema = Schema(*schema_struct.fields)
+for field_id in schema.identifier_field_ids:
+original_field_name = schema.find_column_name(field_id)
+if original_field_name is None:
+raise ValueError(f"Could not find field: {field_id}")
+fresh_field = new_schema.find_field(original_field_name)
+if fresh_field is None:
+raise ValueError(f"Could not lookup field in new schema: 
{original_field_name}")
+fresh_identifier_field_ids.append(fresh_field.field_id)
+
+return new_schema.copy(update={"identifier_field_ids": 
fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+"""Traverses the schema and assigns monotonically increasing ids"""
+
+counter: itertools.count
+
+def __init__(self, start: int = 1) -> None:
+self.counter = itertools.count(start)
+
+def _get_and_increment(self) -> int:
+return next(self.counter)
+
+def schema(self, schema: Schema, struct_result: Callable[[], StructType]) 
-> Schema:
+return Schema(*struct_result().fields, 
identifier_field_ids=schema.identifier_field_ids)
+
+def struct(self, struct: StructType, field_results: List[Callable[[], 
IcebergType]]) -> StructType:
+return StructType(*[field() for field in field_results])
+
+def field(self, field: NestedField, field_result: Callable[[], 
IcebergType]) -> IcebergType:
+return NestedField(
+field_id=self._get_and_increment(), name=field.name, 
field_type=field_result(), required=field.required, doc=field.doc

Review Comment:
   Missed that one, thanks! Just updated the code and tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

2022-08-30 Thread GitBox


Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958853761


##
python/pyiceberg/table/metadata.py:
##
@@ -327,24 +334,43 @@ def check_sort_orders(cls, values: Dict[str, Any]):
 based on the spec. Implementations must throw an exception if a table’s
 version is higher than the supported version."""
 
-table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-"""A UUID that identifies the table, generated when the table is created.
-Implementations must throw an exception if a table’s UUID does not match
-the expected UUID after refreshing metadata."""
-
 last_sequence_number: int = Field(alias="last-sequence-number", 
default=INITIAL_SEQUENCE_NUMBER)
 """The table’s highest assigned sequence number, a monotonically
 increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, 
location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+fresh_schema = assign_fresh_schema_ids(schema)
+fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, 
fresh_schema)
+fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, 
fresh_schema)

Review Comment:
   Only when you create TableMetadata out of it (when creating a new table). 
And it resets if it isn't `1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

2022-08-30 Thread GitBox


Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958866429


##
python/pyiceberg/table/partitioning.py:
##
@@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool:
 
 
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+
+
+def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> 
PartitionSpec:
+partition_fields = []
+for pos, field in enumerate(spec.fields):
+schema_field = schema.find_field(field.name)

Review Comment:
   Sorry, that slipped through somehow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] Fokko commented on pull request #5672: Python: Update docs and fine-tune the API

2022-08-30 Thread GitBox


Fokko commented on PR #5672:
URL: https://github.com/apache/iceberg/pull/5672#issuecomment-1232104623

   Waiting for https://github.com/apache/iceberg/pull/5627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jzhuge commented on pull request #4925: API: Add view interfaces

2022-08-30 Thread GitBox


jzhuge commented on PR #4925:
URL: https://github.com/apache/iceberg/pull/4925#issuecomment-1232176520

   Merged Amogh's PR, rebased, and applied spotless.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] sumeetgajjar commented on pull request #5645: [Docs] Update drop table behavior in spark-ddl docs

2022-08-30 Thread GitBox


sumeetgajjar commented on PR #5645:
URL: https://github.com/apache/iceberg/pull/5645#issuecomment-1232267477

   A gentle ping @Fokko @samredai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] sumeetgajjar commented on pull request #5647: [0.14][Docs] Update drop table behavior in spark-ddl docs

2022-08-30 Thread GitBox


sumeetgajjar commented on PR #5647:
URL: https://github.com/apache/iceberg/pull/5647#issuecomment-1232267521

   A gentle ping @Fokko @samredai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] github-actions[bot] commented on issue #4257: Implement FileIO for Azure

2022-08-30 Thread GitBox


github-actions[bot] commented on issue #4257:
URL: https://github.com/apache/iceberg/issues/4257#issuecomment-1232301235

   This issue has been automatically marked as stale because it has been open 
for 180 days with no activity. It will be closed in next 14 days if no further 
activity occurs. To permanently prevent this issue from being considered stale, 
add the label 'not-stale', but commenting on the issue is preferred when 
possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jzhuge commented on a diff in pull request #4925: API: Add view interfaces

2022-08-30 Thread GitBox


jzhuge commented on code in PR #4925:
URL: https://github.com/apache/iceberg/pull/4925#discussion_r959050316


##
api/src/main/java/org/apache/iceberg/catalog/ViewCatalog.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.catalog;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.view.View;
+import org.apache.iceberg.view.ViewRepresentation;
+
+/**
+ * A Catalog API for view create, drop, and load operations.
+ */
+public interface ViewCatalog {
+
+  /**
+   * Return the name for this catalog.
+   *
+   * @return this catalog's name
+   */
+  String name();
+
+  /**
+   * Return all the identifiers under this namespace.
+   *
+   * @param namespace a namespace
+   * @return a list of identifiers for views
+   * @throws NotFoundException if the namespace is not found
+   */
+  List listViews(Namespace namespace);
+
+  /**
+   * Load a view.
+   *
+   * @param identifier a view identifier
+   * @return instance of {@link View} implementation referred by the identifier
+   * @throws NoSuchViewException if the view does not exist
+   */
+  View loadView(TableIdentifier identifier);
+
+  /**
+   * Check whether view exists.
+   *
+   * @param identifier a view identifier
+   * @return true if the table exists, false otherwise
+   */
+  default boolean viewExists(TableIdentifier identifier) {
+try {
+  loadView(identifier);
+  return true;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view.
+   *
+   * @param identifier a view identifier
+   * @param representations a list of view representations
+   * @param properties a string map of view properties
+   */
+  View createView(
+  TableIdentifier identifier,
+  List representations,

Review Comment:
   +1 definitely need a builder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jzhuge commented on a diff in pull request #4925: API: Add view interfaces

2022-08-30 Thread GitBox


jzhuge commented on code in PR #4925:
URL: https://github.com/apache/iceberg/pull/4925#discussion_r959052594


##
api/src/main/java/org/apache/iceberg/view/ViewVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.view;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A version of the view at a point in time.
+ * 
+ * A version consists of a view metadata file.
+ * 
+ * Versions are created by view operations, like Create and Replace.
+ */
+public interface ViewVersion {

Review Comment:
   It is not snapshot. We deliberately chose `version` to differentiate from 
`snapshot`.
   As for `ViewCommit`, unfortunately that is another over-used word :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] jzhuge commented on a diff in pull request #4925: API: Add view interfaces

2022-08-30 Thread GitBox


jzhuge commented on code in PR #4925:
URL: https://github.com/apache/iceberg/pull/4925#discussion_r959053085


##
api/src/main/java/org/apache/iceberg/view/ViewVersion.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.view;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A version of the view at a point in time.
+ * 
+ * A version consists of a view metadata file.
+ * 
+ * Versions are created by view operations, like Create and Replace.
+ */
+public interface ViewVersion {
+  /**
+   * Return this version's ID.
+   *
+   * @return a long ID

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] hililiwei commented on a diff in pull request #4904: Flink: new sink base on the unified sink API

2022-08-30 Thread GitBox


hililiwei commented on code in PR #4904:
URL: https://github.com/apache/iceberg/pull/4904#discussion_r959093861


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/writer/StreamWriter.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.writer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.sink.committer.FilesCommittable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class StreamWriter
+implements StatefulSinkWriter,
+SinkWriter,
+TwoPhaseCommittingSink.PrecommittingSinkWriter {
+  private static final long serialVersionUID = 1L;
+
+  private final String fullTableName;
+  private final TaskWriterFactory taskWriterFactory;
+  private transient TaskWriter writer;
+  private transient int subTaskId;
+  private final List writeResultsState = 
Lists.newArrayList();
+
+  public StreamWriter(
+  String fullTableName,
+  TaskWriterFactory taskWriterFactory,
+  int subTaskId,
+  int numberOfParallelSubtasks) {
+this.fullTableName = fullTableName;
+this.subTaskId = subTaskId;
+
+this.taskWriterFactory = taskWriterFactory;
+// Initialize the task writer factory.
+taskWriterFactory.initialize(numberOfParallelSubtasks, subTaskId);

Review Comment:
   Copy that. Let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] lvyanquan commented on a diff in pull request #5662: Doc: Update doc to display the results of the table partitions query

2022-08-30 Thread GitBox


lvyanquan commented on code in PR #5662:
URL: https://github.com/apache/iceberg/pull/5662#discussion_r959097698


##
docs/spark-queries.md:
##
@@ -318,12 +318,20 @@ To show a table's current partitions:
 SELECT * FROM prod.db.table.partitions
 ```
 
-| partition | record_count | file_count |
-| -- | -- | -- |
-|  {20211001, 11}|   1| 1|
-|  {20211002, 11}|   1| 1|
-|  {20211001, 10}|   1| 1|
-|  {20211002, 10}|   1| 1|
+If this table is not partitioned

Review Comment:
   addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] badbye commented on issue #2586: Add geometry type to iceberg

2022-08-30 Thread GitBox


badbye commented on issue #2586:
URL: https://github.com/apache/iceberg/issues/2586#issuecomment-1232392801

   To fully support geometry, there are lots of things to do.
   1. Add geometry type.
   2. Partitioning.
   3. Filtering.
   4. Writing and reading.  
   
   Firstly, we must figure out how to store geometry in parquet and Avro files. 
 
[geomesa](https://github.com/locationtech/geomesa/tree/main/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/parquet)
 already did it. [geoparquet](https://github.com/opengeospatial/geoparquet) is 
trying to set up a standard. What about Avro? no idea yet.
   
   Second, use query engines like Spark to read data from sources and write 
geometry records into files. (Since Iceberg only offers an APIs to append 
files, not records) 
   
   Finally, (conditional) reading is not that hard to do.
   
   My team is working on it. Hopefully, we can make it at the end of 2022. 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dmgcodevil opened a new issue, #5675: Limit the number of files for rewrite/compaction action

2022-08-30 Thread GitBox


dmgcodevil opened a new issue, #5675:
URL: https://github.com/apache/iceberg/issues/5675

   ### Query engine
   
   Flink
   
   ### Question
   
   We have a streaming Flink job that continously consumes records from Kafka 
and stores them into Iceberg. 
   The 
[RewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java)
 is getting slower over time and `TaskManager` is starting to fails with OOM. 
Is it possible to limit the number of data files for compaction? I have the 
following ideas:
   
   1. Modify 
https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java#L49
 to limit a number of `FileScanTask` in `CombinedScanTask`. We can create some 
sort of _wrapper_:
   
   ```java
   class BoundedCombinedScanTask implements CombinedScanTask {
   private final CombinedScanTask original;
   private final int limit;
   
   public BoundedCombinedScanTask(CombinedScanTask original, int limit) 
{
   this.original = original;
   this.limit = limit;
   }
   
   @Override
   public Collection files() {
   return 
original.files().stream().limit(limit).collect(Collectors.toList());
   }
   
   @Override
   public CombinedScanTask asCombinedScanTask() {
   return original;
   }
   }
   ```
   
   Flink code will look as follows:
   
   ```java
   protected List rewriteDataForTasks(final List 
combinedScanTasks) {
   final List new BoundedCombinedScanTask(t, 
100)).collect(Collectors.toList());
   int size = boundedCombinedScanTasks .size();
   int parallelism = Math.min(size, this.maxParallelism);
   DataStream dataStream = 
this.env.fromCollection(boundedCombinedScanTasks );
   RowDataRewriter rowDataRewriter = new RowDataRewriter(this.table(), 
this.caseSensitive(), this.fileIO(), this.encryptionManager());
   
   try {
   return rowDataRewriter.rewriteDataForTasks(dataStream, 
parallelism);
   } catch (Exception var7) {
   throw new RuntimeException("Rewrite data file error.", var7);
   }
   }
   ```
   
   2. Modify 
[BaseRewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java)
 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java#L268
   
   So that each group will contain at most N files.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


stevenzwu commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959153410


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   nit: I think we probably don't need this boolean flag. Instead, we can mark 
writer as null after `writer.complete()`. then we can just use the null check 
on writer inside the `flush()` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg-docs] pvary commented on pull request #131: Adding vendors page

2022-08-30 Thread GitBox


pvary commented on PR #131:
URL: https://github.com/apache/iceberg-docs/pull/131#issuecomment-1232472565

   @samredai:  I  see that all of the comments were fixed. If you also think 
that the page is ready to be pushed, then I would be happy to merge.
   
   Thanks, Peter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#issuecomment-1232500424

   Hey @xuzhiwen1255, thanks for the patch!
   
   I’ve been out very sick, but this seems important. I’ll do my best to take a 
look as soon as possible. Thanks Steven for reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] dotjdk opened a new issue, #5676: core: Dropping an old partition column causes NPE (and corrupt metadata on v2 tables)

2022-08-30 Thread GitBox


dotjdk opened a new issue, #5676:
URL: https://github.com/apache/iceberg/issues/5676

   ### Apache Iceberg version
   
   0.14.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug šŸž
   
   On a format version 2 table, dropping an old partition column on an iceberg 
table causes a `NullPointerException` in `PartitionSpecBuilder`, and every 
subsequent operation on the table throws the same exception.
   
   ```java
   java.lang.NullPointerException: Cannot find source column: 2
at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:963)
at org.apache.iceberg.PartitionSpec$Builder.add(PartitionSpec.java:517)
at 
org.apache.iceberg.UnboundPartitionSpec.copyToBuilder(UnboundPartitionSpec.java:56)
at 
org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:44)
at 
org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:87)
   ```
   
   On a v1 table, the table is still accessible after, but the `ALTER TABLE` 
throws the same NPE.
   
   The issue is easily reproducible using the following script in a Spark Shell:
   
   ```sql
   CREATE TABLE data.test_table (ts timestamp not null, day_of_ts date) USING 
iceberg PARTITIONED BY (day_of_ts);
   ALTER TABLE data.test_table SET TBLPROPERTIES ('format-version' = '2');
   ALTER TABLE data.test_table REPLACE PARTITION FIELD day_of_ts WITH days(ts);
   ALTER TABLE data.test_table DROP COLUMN day_of_ts;
   REFRESH TABLE data.test_table;
   SELECT * FROM data.test_table;
   ```
   
   On closer inspection of the metadata, I see that on a v1 table, the metadata 
is not updated when dropping the old partition field, which explains why the 
table is still working on v1 after, but I also don't see what the issue is with 
the v2 metadata.
   
   I am using Spark Shell on Spark 3.3.0 with Iceberg 0.14.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959194058


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   Which `writer.complete()` call are you suggesting? It seems to me that 
`writer.complete()` is called in `flush`, which is explicitly called in 
multiple places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5662: Doc: Update doc to display the results of the table partitions query

2022-08-30 Thread GitBox


szehon-ho commented on code in PR #5662:
URL: https://github.com/apache/iceberg/pull/5662#discussion_r959194801


##
docs/spark-queries.md:
##
@@ -318,12 +318,15 @@ To show a table's current partitions:
 SELECT * FROM prod.db.table.partitions
 ```
 
-| partition | record_count | file_count |
-| -- | -- | -- |
-|  {20211001, 11}|   1| 1|
-|  {20211002, 11}|   1| 1|
-|  {20211001, 10}|   1| 1|
-|  {20211002, 10}|   1| 1|
+| partition | record_count | file_count | spec_id |
+| -- | -- | -- | -- |
+|  {20211001, 11}|   1| 1| 0|
+|  {20211002, 11}|   1| 1| 0|
+|  {20211001, 10}|   1| 1| 0|
+|  {20211002, 10}|   1| 1| 0|
+
+Note:
+If this table is non-partitioned, the resultSet will contain record_count and 
file_count only.

Review Comment:
   Nit: resultSet is kind of specific , as this is just about a table, can we 
just omit it like:  
   "..., it will contain only the record_count and file_count columns."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


xuzhiwen1255 commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959197838


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   > nit: I think we probably don't need this boolean flag. Instead, we can 
mark writer as null after `writer.complete()`. then we can just use the null 
check on writer inside the `flush()` method.
   
   Can you add an isclosed method to write to determine whether it has been 
closed
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xuzhiwen1255 commented on pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


xuzhiwen1255 commented on PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#issuecomment-1232511957

   > Hey @xuzhiwen1255, thanks for the patch!
   > 
   > I’ve been out very sick, but this seems important. I’ll do my best to take 
a look as soon as possible. Thanks Steven for reviewing.
   
   Thank you for your reply. Pay attention to your health while working. I wish 
you a speedy recovery.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959213199


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   Oh never mind I see: we’re calling create and instantiating a new writer 
after the other places that call `flush`, but not this one. Arguably we don’t 
need the variable then.
   
   But we should consider cleaning up the above comment and adding information 
about the `null` check being used as the guard here or where the check happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] lvyanquan commented on a diff in pull request #5662: Doc: Update doc to display the results of the table partitions query

2022-08-30 Thread GitBox


lvyanquan commented on code in PR #5662:
URL: https://github.com/apache/iceberg/pull/5662#discussion_r959213698


##
docs/spark-queries.md:
##
@@ -318,12 +318,15 @@ To show a table's current partitions:
 SELECT * FROM prod.db.table.partitions
 ```
 
-| partition | record_count | file_count |
-| -- | -- | -- |
-|  {20211001, 11}|   1| 1|
-|  {20211002, 11}|   1| 1|
-|  {20211001, 10}|   1| 1|
-|  {20211002, 10}|   1| 1|
+| partition | record_count | file_count | spec_id |
+| -- | -- | -- | -- |
+|  {20211001, 11}|   1| 1| 0|
+|  {20211002, 11}|   1| 1| 0|
+|  {20211001, 10}|   1| 1| 0|
+|  {20211002, 10}|   1| 1| 0|
+
+Note:
+If this table is non-partitioned, the resultSet will contain record_count and 
file_count only.

Review Comment:
   Thanks for your suggestion, addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


xuzhiwen1255 commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959214446


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   I understand. I'll go and fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] tongwei opened a new issue, #5677: Why locality is disable default when FileSystem scheme is not hdfs

2022-08-30 Thread GitBox


tongwei opened a new issue, #5677:
URL: https://github.com/apache/iceberg/issues/5677

   ### Query engine
   
   spark
   
   ### Question
   
   When I test iceberg with alluxio and spark, I notice that locality is 
disable by default when FileSystem scheme is not hdfs. To enable this I can 
only use `spark.read.option("locality",true).table("table_name")` to make spark 
use locality. Why disable locality  when FileSystem scheme is not hdfs. Can we 
enable this by default.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959214832


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   I agree that a method might be helpful for readability.
   
   `isClosed` seems odd to me personally as there is a `close` method on the 
class and we haven’t necessarily called it yet. Do you suggest adding 
`isClosed` to the writer itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] tongwei commented on issue #5677: Why locality is disable default when FileSystem scheme is not hdfs

2022-08-30 Thread GitBox


tongwei commented on issue #5677:
URL: https://github.com/apache/iceberg/issues/5677#issuecomment-1232531922

   [
   
![UvqGOmq3fK](https://user-images.githubusercontent.com/32157039/187612031-89a21480-9dda-4981-9d2b-4d0a9b2dda6d.jpg)
   ](url)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959217458


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   If the writer class isn’t being touched (I haven’t looked that closely yet) 
or if the method will be placed on this class, I’d consider naming the method 
something like `shouldFlushWriter` or something.
   
   But it does seem that checking for null is technically sufficient overall. 
Thanks @xuzhiwen1255.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959219452


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   Also, if it’s possible to add a test for this, that would be great! However, 
the test seems admittedly flaky by nature so if it’s not consistently possible 
don’t worry too much about it. But looking into it would be great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


kbendick commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959219452


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   Also, if it’s possible to add a test for this, that would be great! However, 
the test seems admittedly like it might be flaky by nature, so if it’s not 
consistently possible to test for this behavior or a spy can’t be used or 
something, don’t worry too much about it. But looking into it would be great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


xuzhiwen1255 commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959222044


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   It's me. I think null is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate

2022-08-30 Thread GitBox


xuzhiwen1255 commented on code in PR #5642:
URL: https://github.com/apache/iceberg/pull/5642#discussion_r959223612


##
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##
@@ -87,6 +89,7 @@ public void endInput() throws IOException {
 // remaining completed files to downstream before closing the writer so 
that we won't miss any
 // of them.
 flush();
+ended = true;

Review Comment:
   The nice thing about adding a new method is that the code will look a little 
bit cleaner, and if we're adding a new method, we'll name it something that 
matches
   @kbendick 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org