[GitHub] [iceberg] Shane-Yu opened a new issue, #5671: The upsert mode can query the historical version of the data under certain conditions
Shane-Yu opened a new issue, #5671: URL: https://github.com/apache/iceberg/issues/5671 ### Apache Iceberg version 0.13.1 ### Query engine Hive ### Please describe the bug š In Iceberg upsert mode, create v2 table like thisļ¼ > create table upsert_update_time_test( > id bigint comment 'pk', > data bigint comment 'data', > update_time string comment 'update_time' > ) > comment 'upsert_update_time_test' > STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' > TBLPROPERTIES ( > 'engine.hive.enabled'='true', > 'write.metadata.delete-after-commit.enabled'='true', > 'write.target-file-size-bytes'='134217728', > 'write.metadata.previous-versions-max'='5', > 'write.metadata.metrics.default'='full', > 'format-version'='2' > ); Write data to iceberg with Flink like the code below: > FlinkSink.forRow(rowDataStream, tableSchema) > .tableLoader(tableLoader) > .tableSchema(tableSchema) > .upsert(true) > .writeParallelism(1) > .equalityFieldColumns(ImmutableList.of("id")) > .append(); And send data to like this: > $ nc -lk 3287 > I,1,101,2022-08-26 15:44:50 > U,1,103,2022-08-26 15:45:23  Finally, using hive and spark both got the following query resultsļ¼ >select * from upsert_update_time_test; > OK > upsert_update_time_test.id upsert_update_time_test.data upsert_update_time_test.update_time > 1 103 2022-08-26 15:45:23 > Time taken: 0.107 seconds, Fetched: 1 row(s) > hive (iceberg_yx)> select * from upsert_update_time_test where update_time <= '2022-08-26 15:45:00'; > OK > upsert_update_time_test.id upsert_update_time_test.data upsert_update_time_test.update_time > 1 101 2022-08-26 15:44:50 > Time taken: 0.76 seconds, Fetched: 1 row(s) > hive (iceberg_yx)> select * from upsert_update_time_test where update_time <= '2022-08-26 15:46:00'; > OK > upsert_update_time_test.id upsert_update_time_test.data upsert_update_time_test.update_time > 1 103 2022-08-26 15:45:23 > Time taken: 1.26 seconds, Fetched: 1 row(s) > hive (iceberg_yx)> > > select * from upsert_update_time_test where data <= 102; > OK > upsert_update_time_test.id upsert_update_time_test.data upsert_update_time_test.update_time > 1 101 2022-08-26 15:44:50 > Time taken: 0.119 seconds, Fetched: 1 row(s) > hive (iceberg_yx)> > > select * from upsert_update_time_test where data <= 103; > OK > upsert_update_time_test.id upsert_update_time_test.data upsert_update_time_test.update_time > 1 103 2022-08-26 15:45:23 > Time taken: 0.114 seconds, Fetched: 1 row(s) > hive (iceberg_yx)> > > select * from upsert_update_time_test where id = 1; > OK > upsert_update_time_test.id upsert_update_time_test.data upsert_update_time_test.update_time > 1 103 2022-08-26 15:45:23 > Time taken: 0.134 seconds, Fetched: 1 row(s)   The above query results show that the v2 table can **_query the historical version of the data when it meets the historical data conditions_**. Is this a bug or is there something wrong with my operation? Anybody else met thisļ¼ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Shane-Yu commented on issue #5671: The upsert mode can query the historical version of the data under certain conditions
Shane-Yu commented on issue #5671: URL: https://github.com/apache/iceberg/issues/5671#issuecomment-1231565348 @rdblue @openinx @stevenzwu @kbendick Can you guys take some time to look at thisļ¼ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #5672: Python: Update docs and fine-tune the API
Fokko opened a new pull request, #5672: URL: https://github.com/apache/iceberg/pull/5672 The API wasn't consistent everywhere. Now the ids will just initialize at 1, so the user doesn't have to do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Shane-Yu closed issue #5671: The upsert mode can query the historical version of the data under certain conditions
Shane-Yu closed issue #5671: The upsert mode can query the historical version of the data under certain conditions URL: https://github.com/apache/iceberg/issues/5671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rajan-v opened a new issue, #5673: Support for new offset based deletion interface deleteOffsetFromDataFile in DeleteFiles
rajan-v opened a new issue, #5673: URL: https://github.com/apache/iceberg/issues/5673 ### Feature Request / Improvement Support for another interface in DeleteFiles _DeleteFiles deleteOffsetFromDataFile(Map dataFileAndOffsetFileMap)_ https://iceberg.apache.org/javadoc/master/org/apache/iceberg/DeleteFiles.html **Context**: If we process datafile directly and derive that for a datafile we need to delete records at certain offsets, then we need some interface from Iceberg to pass that {datafile, offset} information. Till now without upsert features in catalog like hive table, lot of legacy application are scanning files and deriving business related details along with doing some updates by traditional way of rewriting complete hdfs files. All such computation can be reused if we have this API supported from Iceberg. **Option-1 (Eventual Deletes)** This can be thought as Eventual Deletes as this delete flow can just update the delete data files struct and can skip updating the manifest and stats around it. Any new snapshot commit can compute correct set of deletion and fix the manifest stats w.r.t deletion. _Option-2(Actual Deletes)_ Go through the standard flow of deletion and apply offset deletion. Did some experiment in executing deletes on Iceberg tables and then replacing, removing the immutable delete_files generated in the respective directories. Was able to validate the application of those delete filters while querying. ### Query engine Spark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] samredai commented on pull request #4801: Add Configuration page
samredai commented on PR #4801: URL: https://github.com/apache/iceberg/pull/4801#issuecomment-1231689951 @rdblue when you have a chance can you take another look at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] huaxingao commented on a diff in pull request #5638: Bind overwrite filters
huaxingao commented on code in PR #5638: URL: https://github.com/apache/iceberg/pull/5638#discussion_r958586790 ## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java: ## @@ -47,14 +55,39 @@ public void removeTable() { @Test public void testTableEquality() throws NoSuchTableException { -CatalogManager catalogManager = spark.sessionState().catalogManager(); -TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName); -Identifier identifier = Identifier.of(tableIdent.namespace().levels(), tableIdent.name()); -SparkTable table1 = (SparkTable) catalog.loadTable(identifier); -SparkTable table2 = (SparkTable) catalog.loadTable(identifier); - +SparkTable table1 = loadTable(); +SparkTable table2 = loadTable(); // different instances pointing to the same table must be equivalent Assert.assertNotSame("References must be different", table1, table2); Assert.assertEquals("Tables must be equivalent", table1, table2); } + + @Test + public void testOverwriteFilterConversions() throws NoSuchTableException { Review Comment: Thanks a lot for taking a look at this PR! I looked at the real-world usage (`INSERT OVERWRITE` or `DataFrameWriterV2.overwrite`) and realized that actually Spark will throw `AnalysisException` if the overwrite filters are on invalid columns. So there is no need to bind the filters. I will close this PR. The reason I did this PR is because I was trying to address this [comment](https://github.com/apache/iceberg/pull/5302#discussion_r950580132). Now since there is no need to bind the filters in `SparkFilters.convert(Filter[] filters)`, I will add back the `SparkV2Filters.convert(Predicate[] predicates)`. I am also wondering if this [bind](https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java#L122) is needed. If the filter expression is on invalid columns, Spark throws `AnalysisException` before it reaches here. Shall I remove this bind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] msb1 commented on issue #5630: Failed to start the Flink task (write iceberg)
msb1 commented on issue #5630: URL: https://github.com/apache/iceberg/issues/5630#issuecomment-1231783307 If you are using gradle and creating shadow jar; do not use minimize()... Was doing build with shadowJar { minimize() zip64 true } in build.gradle. Had identical error message to what is shown in first post above. Changed to shadowJar { zip64 true } in build.gradle and error is gone. Appears that minimize() was removing shaded caffeine from fat jar. Could probably have problem if using maven shaded plugin as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] pvary commented on a diff in pull request #4518: core: Provide mechanism to cache manifest file content
pvary commented on code in PR #4518: URL: https://github.com/apache/iceberg/pull/4518#discussion_r958600742 ## core/src/main/java/org/apache/iceberg/ManifestFiles.java: ## @@ -300,4 +328,14 @@ private static ManifestFile copyManifestInternal( return writer.toManifestFile(); } + + private static InputFile newInputFile(FileIO io, String path, long length) { +InputFile inputFile = io.newInputFile(path, length); Review Comment: I have seen cases where calling `newInputFile` was the bottleneck, since the HadoopFileIo created a new `FileSystem` object every time. Why did we decide to key the cache with the `inputFile` instead of the `path`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko opened a new pull request, #5674: Python: Install PyYaml
Fokko opened a new pull request, #5674: URL: https://github.com/apache/iceberg/pull/5674 Currently it is missing: ``` root@88de3a02961f:/# pip install "git+https://github.com/apache/iceberg.git#subdirectory=python[pyarrow]"^C root@88de3a02961f:/# pyiceberg Traceback (most recent call last): File "/usr/local/bin/pyiceberg", line 5, in from pyiceberg.cli.console import run File "/usr/local/lib/python3.9/site-packages/pyiceberg/cli/console.py", line 30, in from pyiceberg.catalog import Catalog, load_catalog File "/usr/local/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py", line 37, in from pyiceberg.utils.config import Config, merge_config File "/usr/local/lib/python3.9/site-packages/pyiceberg/utils/config.py", line 21, in import yaml ModuleNotFoundError: No module named 'yaml' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5669: Core: Expire Snapshots reachability analysis
amogh-jahagirdar commented on code in PR #5669: URL: https://github.com/apache/iceberg/pull/5669#discussion_r958663366 ## core/src/main/java/org/apache/iceberg/RemoveSnapshots.java: ## @@ -623,22 +667,82 @@ private Set findFilesToDelete( return filesToDelete; } + // Helper to compute files to delete + private Set findFilesToDelete( Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rdblue merged pull request #5021: Add API changes for statistics information in table metadata
rdblue merged PR #5021: URL: https://github.com/apache/iceberg/pull/5021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] rizaon commented on a diff in pull request #4518: core: Provide mechanism to cache manifest file content
rizaon commented on code in PR #4518: URL: https://github.com/apache/iceberg/pull/4518#discussion_r958669856 ## core/src/main/java/org/apache/iceberg/ManifestFiles.java: ## @@ -300,4 +328,14 @@ private static ManifestFile copyManifestInternal( return writer.toManifestFile(); } + + private static InputFile newInputFile(FileIO io, String path, long length) { +InputFile inputFile = io.newInputFile(path, length); Review Comment: `inputFile` is not used as cache key, but rather as a fallback within `CachingInputFile` class (field `wrappedInputFile `) if `path` does not exist in `ContentCache` yet. I understand that `HadoopInputFile.fromLocation()` will create a new `FileSystem` upon initialization. Maybe I can tweak the code to lazily instantiate `wrappedInputFile` by passing `io`, `path`, and `length` instead as constructor parameters of `CachingInputFile`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #5665: Core: Exclude old fields from the partition spec
Fokko commented on code in PR #5665: URL: https://github.com/apache/iceberg/pull/5665#discussion_r958677816 ## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java: ## @@ -212,7 +211,7 @@ public Set capabilities() { @Override public MetadataColumn[] metadataColumns() { -DataType sparkPartitionType = SparkSchemaUtil.convert(Partitioning.partitionType(table())); +DataType sparkPartitionType = SparkSchemaUtil.convert(table().spec().partitionType()); Review Comment: Reading it a second time, and looking at the failing test, it makes sense to me: https://github.com/apache/iceberg/blob/dbb8a404f6632a55acb36e949f0e7b84b643cede/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java#L135-L166 We want to be able to jump back to earlier specs, and therefore we include them in the struct of the metadata column. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg-docs] bitsondatadev commented on pull request #131: Adding vendors page
bitsondatadev commented on PR #131: URL: https://github.com/apache/iceberg-docs/pull/131#issuecomment-1231925458 Checking in here @rdblue and @samredai. Is anything holding this up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko merged pull request #5674: Python: Install PyYaml
Fokko merged PR #5674: URL: https://github.com/apache/iceberg/pull/5674 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] findepi commented on pull request #4741: Add implementation for statistics information in table snapshot
findepi commented on PR #4741: URL: https://github.com/apache/iceberg/pull/4741#issuecomment-1231967878 Rebased after #5021 has been merged to make Conflicts disappear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] findepi commented on pull request #5021: Add API changes for statistics information in table metadata
findepi commented on PR #5021: URL: https://github.com/apache/iceberg/pull/5021#issuecomment-1231966833 Thank you for the merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids
Fokko commented on code in PR #5627: URL: https://github.com/apache/iceberg/pull/5627#discussion_r958814479 ## python/pyiceberg/schema.py: ## @@ -276,6 +279,32 @@ def primitive(self, primitive: PrimitiveType) -> T: """Visit a PrimitiveType""" +class PreOrderSchemaVisitor(Generic[T], ABC): Review Comment: It is pre-order traversal since we start at the root and then move to the leaves. In order is a bit less intuitive since it is not a binary tree. You could also do a reverse in-order, but not sure if we need that. We can also call it CustomOrder if you have a strong preference, but I think pre-order is the most logical way of using this visitor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids
Fokko commented on code in PR #5627: URL: https://github.com/apache/iceberg/pull/5627#discussion_r958815658 ## python/pyiceberg/schema.py: ## @@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int: def primitive(self, primitive: PrimitiveType) -> int: return 0 + + +def assign_fresh_schema_ids(schema: Schema) -> Schema: +"""Traverses the schema, and sets new IDs""" +schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs()) + +fresh_identifier_field_ids = [] +new_schema = Schema(*schema_struct.fields) +for field_id in schema.identifier_field_ids: +original_field_name = schema.find_column_name(field_id) +if original_field_name is None: +raise ValueError(f"Could not find field: {field_id}") +fresh_field = new_schema.find_field(original_field_name) +if fresh_field is None: +raise ValueError(f"Could not lookup field in new schema: {original_field_name}") +fresh_identifier_field_ids.append(fresh_field.field_id) + +return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids}) + + +class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]): +"""Traverses the schema and assigns monotonically increasing ids""" + +counter: itertools.count + +def __init__(self, start: int = 1) -> None: +self.counter = itertools.count(start) + +def _get_and_increment(self) -> int: +return next(self.counter) + +def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema: +return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids) Review Comment: Yes, we do that in the function itself: ```python def assign_fresh_schema_ids(schema: Schema) -> Schema: """Traverses the schema, and sets new IDs""" schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs()) fresh_identifier_field_ids = [] new_schema = Schema(*schema_struct.fields) for field_id in schema.identifier_field_ids: original_field_name = schema.find_column_name(field_id) if original_field_name is None: raise ValueError(f"Could not find field: {field_id}") fresh_field = new_schema.find_field(original_field_name) if fresh_field is None: raise ValueError(f"Could not lookup field in new schema: {original_field_name}") fresh_identifier_field_ids.append(fresh_field.field_id) return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids}) ``` This is because we first want to know all the IDs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids
Fokko commented on code in PR #5627: URL: https://github.com/apache/iceberg/pull/5627#discussion_r958853064 ## python/pyiceberg/schema.py: ## @@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int: def primitive(self, primitive: PrimitiveType) -> int: return 0 + + +def assign_fresh_schema_ids(schema: Schema) -> Schema: +"""Traverses the schema, and sets new IDs""" +schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs()) + +fresh_identifier_field_ids = [] +new_schema = Schema(*schema_struct.fields) +for field_id in schema.identifier_field_ids: +original_field_name = schema.find_column_name(field_id) +if original_field_name is None: +raise ValueError(f"Could not find field: {field_id}") +fresh_field = new_schema.find_field(original_field_name) +if fresh_field is None: +raise ValueError(f"Could not lookup field in new schema: {original_field_name}") +fresh_identifier_field_ids.append(fresh_field.field_id) + +return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids}) + + +class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]): +"""Traverses the schema and assigns monotonically increasing ids""" + +counter: itertools.count + +def __init__(self, start: int = 1) -> None: +self.counter = itertools.count(start) + +def _get_and_increment(self) -> int: +return next(self.counter) + +def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema: +return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids) Review Comment: Ah, I've refactored this because we need to build a map anyway šš» ## python/pyiceberg/schema.py: ## @@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int: def primitive(self, primitive: PrimitiveType) -> int: return 0 + + +def assign_fresh_schema_ids(schema: Schema) -> Schema: +"""Traverses the schema, and sets new IDs""" +schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs()) + +fresh_identifier_field_ids = [] +new_schema = Schema(*schema_struct.fields) +for field_id in schema.identifier_field_ids: +original_field_name = schema.find_column_name(field_id) +if original_field_name is None: +raise ValueError(f"Could not find field: {field_id}") +fresh_field = new_schema.find_field(original_field_name) +if fresh_field is None: +raise ValueError(f"Could not lookup field in new schema: {original_field_name}") +fresh_identifier_field_ids.append(fresh_field.field_id) + +return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids}) + + +class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]): +"""Traverses the schema and assigns monotonically increasing ids""" + +counter: itertools.count + +def __init__(self, start: int = 1) -> None: +self.counter = itertools.count(start) + +def _get_and_increment(self) -> int: +return next(self.counter) + +def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema: +return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids) + +def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType: +return StructType(*[field() for field in field_results]) + +def field(self, field: NestedField, field_result: Callable[[], IcebergType]) -> IcebergType: +return NestedField( +field_id=self._get_and_increment(), name=field.name, field_type=field_result(), required=field.required, doc=field.doc Review Comment: Missed that one, thanks! Just updated the code and tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids
Fokko commented on code in PR #5627: URL: https://github.com/apache/iceberg/pull/5627#discussion_r958853761 ## python/pyiceberg/table/metadata.py: ## @@ -327,24 +334,43 @@ def check_sort_orders(cls, values: Dict[str, Any]): based on the spec. Implementations must throw an exception if a tableās version is higher than the supported version.""" -table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4) -"""A UUID that identifies the table, generated when the table is created. -Implementations must throw an exception if a tableās UUID does not match -the expected UUID after refreshing metadata.""" - last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER) """The tableās highest assigned sequence number, a monotonically increasing long that tracks the order of snapshots in a table.""" -class TableMetadata: +TableMetadata = Union[TableMetadataV1, TableMetadataV2] + + +def new_table_metadata( +schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT +) -> TableMetadata: +fresh_schema = assign_fresh_schema_ids(schema) +fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema) +fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) Review Comment: Only when you create TableMetadata out of it (when creating a new table). And it resets if it isn't `1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids
Fokko commented on code in PR #5627: URL: https://github.com/apache/iceberg/pull/5627#discussion_r958866429 ## python/pyiceberg/table/partitioning.py: ## @@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool: UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0) + + +def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> PartitionSpec: +partition_fields = [] +for pos, field in enumerate(spec.fields): +schema_field = schema.find_field(field.name) Review Comment: Sorry, that slipped through somehow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on pull request #5672: Python: Update docs and fine-tune the API
Fokko commented on PR #5672: URL: https://github.com/apache/iceberg/pull/5672#issuecomment-1232104623 Waiting for https://github.com/apache/iceberg/pull/5627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jzhuge commented on pull request #4925: API: Add view interfaces
jzhuge commented on PR #4925: URL: https://github.com/apache/iceberg/pull/4925#issuecomment-1232176520 Merged Amogh's PR, rebased, and applied spotless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] sumeetgajjar commented on pull request #5645: [Docs] Update drop table behavior in spark-ddl docs
sumeetgajjar commented on PR #5645: URL: https://github.com/apache/iceberg/pull/5645#issuecomment-1232267477 A gentle ping @Fokko @samredai -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] sumeetgajjar commented on pull request #5647: [0.14][Docs] Update drop table behavior in spark-ddl docs
sumeetgajjar commented on PR #5647: URL: https://github.com/apache/iceberg/pull/5647#issuecomment-1232267521 A gentle ping @Fokko @samredai -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] github-actions[bot] commented on issue #4257: Implement FileIO for Azure
github-actions[bot] commented on issue #4257: URL: https://github.com/apache/iceberg/issues/4257#issuecomment-1232301235 This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jzhuge commented on a diff in pull request #4925: API: Add view interfaces
jzhuge commented on code in PR #4925: URL: https://github.com/apache/iceberg/pull/4925#discussion_r959050316 ## api/src/main/java/org/apache/iceberg/catalog/ViewCatalog.java: ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.catalog; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewRepresentation; + +/** + * A Catalog API for view create, drop, and load operations. + */ +public interface ViewCatalog { + + /** + * Return the name for this catalog. + * + * @return this catalog's name + */ + String name(); + + /** + * Return all the identifiers under this namespace. + * + * @param namespace a namespace + * @return a list of identifiers for views + * @throws NotFoundException if the namespace is not found + */ + List listViews(Namespace namespace); + + /** + * Load a view. + * + * @param identifier a view identifier + * @return instance of {@link View} implementation referred by the identifier + * @throws NoSuchViewException if the view does not exist + */ + View loadView(TableIdentifier identifier); + + /** + * Check whether view exists. + * + * @param identifier a view identifier + * @return true if the table exists, false otherwise + */ + default boolean viewExists(TableIdentifier identifier) { +try { + loadView(identifier); + return true; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view. + * + * @param identifier a view identifier + * @param representations a list of view representations + * @param properties a string map of view properties + */ + View createView( + TableIdentifier identifier, + List representations, Review Comment: +1 definitely need a builder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jzhuge commented on a diff in pull request #4925: API: Add view interfaces
jzhuge commented on code in PR #4925: URL: https://github.com/apache/iceberg/pull/4925#discussion_r959052594 ## api/src/main/java/org/apache/iceberg/view/ViewVersion.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; + +/** + * A version of the view at a point in time. + * + * A version consists of a view metadata file. + * + * Versions are created by view operations, like Create and Replace. + */ +public interface ViewVersion { Review Comment: It is not snapshot. We deliberately chose `version` to differentiate from `snapshot`. As for `ViewCommit`, unfortunately that is another over-used word :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] jzhuge commented on a diff in pull request #4925: API: Add view interfaces
jzhuge commented on code in PR #4925: URL: https://github.com/apache/iceberg/pull/4925#discussion_r959053085 ## api/src/main/java/org/apache/iceberg/view/ViewVersion.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; + +/** + * A version of the view at a point in time. + * + * A version consists of a view metadata file. + * + * Versions are created by view operations, like Create and Replace. + */ +public interface ViewVersion { + /** + * Return this version's ID. + * + * @return a long ID Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] hililiwei commented on a diff in pull request #4904: Flink: new sink base on the unified sink API
hililiwei commented on code in PR #4904: URL: https://github.com/apache/iceberg/pull/4904#discussion_r959093861 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/writer/StreamWriter.java: ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.writer; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.flink.sink.committer.FilesCommittable; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class StreamWriter +implements StatefulSinkWriter, +SinkWriter, +TwoPhaseCommittingSink.PrecommittingSinkWriter { + private static final long serialVersionUID = 1L; + + private final String fullTableName; + private final TaskWriterFactory taskWriterFactory; + private transient TaskWriter writer; + private transient int subTaskId; + private final List writeResultsState = Lists.newArrayList(); + + public StreamWriter( + String fullTableName, + TaskWriterFactory taskWriterFactory, + int subTaskId, + int numberOfParallelSubtasks) { +this.fullTableName = fullTableName; +this.subTaskId = subTaskId; + +this.taskWriterFactory = taskWriterFactory; +// Initialize the task writer factory. +taskWriterFactory.initialize(numberOfParallelSubtasks, subTaskId); Review Comment: Copy that. Let me fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] lvyanquan commented on a diff in pull request #5662: Doc: Update doc to display the results of the table partitions query
lvyanquan commented on code in PR #5662: URL: https://github.com/apache/iceberg/pull/5662#discussion_r959097698 ## docs/spark-queries.md: ## @@ -318,12 +318,20 @@ To show a table's current partitions: SELECT * FROM prod.db.table.partitions ``` -| partition | record_count | file_count | -| -- | -- | -- | -| {20211001, 11}| 1| 1| -| {20211002, 11}| 1| 1| -| {20211001, 10}| 1| 1| -| {20211002, 10}| 1| 1| +If this table is not partitioned Review Comment: addressed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] badbye commented on issue #2586: Add geometry type to iceberg
badbye commented on issue #2586: URL: https://github.com/apache/iceberg/issues/2586#issuecomment-1232392801 To fully support geometry, there are lots of things to do. 1. Add geometry type. 2. Partitioning. 3. Filtering. 4. Writing and reading. Firstly, we must figure out how to store geometry in parquet and Avro files. [geomesa](https://github.com/locationtech/geomesa/tree/main/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/parquet) already did it. [geoparquet](https://github.com/opengeospatial/geoparquet) is trying to set up a standard. What about Avro? no idea yet. Second, use query engines like Spark to read data from sources and write geometry records into files. (Since Iceberg only offers an APIs to append files, not records) Finally, (conditional) reading is not that hard to do. My team is working on it. Hopefully, we can make it at the end of 2022. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dmgcodevil opened a new issue, #5675: Limit the number of files for rewrite/compaction action
dmgcodevil opened a new issue, #5675: URL: https://github.com/apache/iceberg/issues/5675 ### Query engine Flink ### Question We have a streaming Flink job that continously consumes records from Kafka and stores them into Iceberg. The [RewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java) is getting slower over time and `TaskManager` is starting to fails with OOM. Is it possible to limit the number of data files for compaction? I have the following ideas: 1. Modify https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java#L49 to limit a number of `FileScanTask` in `CombinedScanTask`. We can create some sort of _wrapper_: ```java class BoundedCombinedScanTask implements CombinedScanTask { private final CombinedScanTask original; private final int limit; public BoundedCombinedScanTask(CombinedScanTask original, int limit) { this.original = original; this.limit = limit; } @Override public Collection files() { return original.files().stream().limit(limit).collect(Collectors.toList()); } @Override public CombinedScanTask asCombinedScanTask() { return original; } } ``` Flink code will look as follows: ```java protected List rewriteDataForTasks(final List combinedScanTasks) { final List new BoundedCombinedScanTask(t, 100)).collect(Collectors.toList()); int size = boundedCombinedScanTasks .size(); int parallelism = Math.min(size, this.maxParallelism); DataStream dataStream = this.env.fromCollection(boundedCombinedScanTasks ); RowDataRewriter rowDataRewriter = new RowDataRewriter(this.table(), this.caseSensitive(), this.fileIO(), this.encryptionManager()); try { return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); } catch (Exception var7) { throw new RuntimeException("Rewrite data file error.", var7); } } ``` 2. Modify [BaseRewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java) https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java#L268 So that each group will contain at most N files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
stevenzwu commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959153410 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: nit: I think we probably don't need this boolean flag. Instead, we can mark writer as null after `writer.complete()`. then we can just use the null check on writer inside the `flush()` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg-docs] pvary commented on pull request #131: Adding vendors page
pvary commented on PR #131: URL: https://github.com/apache/iceberg-docs/pull/131#issuecomment-1232472565 @samredai: I see that all of the comments were fixed. If you also think that the page is ready to be pushed, then I would be happy to merge. Thanks, Peter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on PR #5642: URL: https://github.com/apache/iceberg/pull/5642#issuecomment-1232500424 Hey @xuzhiwen1255, thanks for the patch! Iāve been out very sick, but this seems important. Iāll do my best to take a look as soon as possible. Thanks Steven for reviewing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] dotjdk opened a new issue, #5676: core: Dropping an old partition column causes NPE (and corrupt metadata on v2 tables)
dotjdk opened a new issue, #5676: URL: https://github.com/apache/iceberg/issues/5676 ### Apache Iceberg version 0.14.0 (latest release) ### Query engine Spark ### Please describe the bug š On a format version 2 table, dropping an old partition column on an iceberg table causes a `NullPointerException` in `PartitionSpecBuilder`, and every subsequent operation on the table throws the same exception. ```java java.lang.NullPointerException: Cannot find source column: 2 at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:963) at org.apache.iceberg.PartitionSpec$Builder.add(PartitionSpec.java:517) at org.apache.iceberg.UnboundPartitionSpec.copyToBuilder(UnboundPartitionSpec.java:56) at org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:44) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:87) ``` On a v1 table, the table is still accessible after, but the `ALTER TABLE` throws the same NPE. The issue is easily reproducible using the following script in a Spark Shell: ```sql CREATE TABLE data.test_table (ts timestamp not null, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts); ALTER TABLE data.test_table SET TBLPROPERTIES ('format-version' = '2'); ALTER TABLE data.test_table REPLACE PARTITION FIELD day_of_ts WITH days(ts); ALTER TABLE data.test_table DROP COLUMN day_of_ts; REFRESH TABLE data.test_table; SELECT * FROM data.test_table; ``` On closer inspection of the metadata, I see that on a v1 table, the metadata is not updated when dropping the old partition field, which explains why the table is still working on v1 after, but I also don't see what the issue is with the v2 metadata. I am using Spark Shell on Spark 3.3.0 with Iceberg 0.14.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959194058 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: Which `writer.complete()` call are you suggesting? It seems to me that `writer.complete()` is called in `flush`, which is explicitly called in multiple places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5662: Doc: Update doc to display the results of the table partitions query
szehon-ho commented on code in PR #5662: URL: https://github.com/apache/iceberg/pull/5662#discussion_r959194801 ## docs/spark-queries.md: ## @@ -318,12 +318,15 @@ To show a table's current partitions: SELECT * FROM prod.db.table.partitions ``` -| partition | record_count | file_count | -| -- | -- | -- | -| {20211001, 11}| 1| 1| -| {20211002, 11}| 1| 1| -| {20211001, 10}| 1| 1| -| {20211002, 10}| 1| 1| +| partition | record_count | file_count | spec_id | +| -- | -- | -- | -- | +| {20211001, 11}| 1| 1| 0| +| {20211002, 11}| 1| 1| 0| +| {20211001, 10}| 1| 1| 0| +| {20211002, 10}| 1| 1| 0| + +Note: +If this table is non-partitioned, the resultSet will contain record_count and file_count only. Review Comment: Nit: resultSet is kind of specific , as this is just about a table, can we just omit it like: "..., it will contain only the record_count and file_count columns." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
xuzhiwen1255 commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959197838 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: > nit: I think we probably don't need this boolean flag. Instead, we can mark writer as null after `writer.complete()`. then we can just use the null check on writer inside the `flush()` method. Can you add an isclosed method to write to determine whether it has been closed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xuzhiwen1255 commented on pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
xuzhiwen1255 commented on PR #5642: URL: https://github.com/apache/iceberg/pull/5642#issuecomment-1232511957 > Hey @xuzhiwen1255, thanks for the patch! > > Iāve been out very sick, but this seems important. Iāll do my best to take a look as soon as possible. Thanks Steven for reviewing. Thank you for your reply. Pay attention to your health while working. I wish you a speedy recovery. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959213199 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: Oh never mind I see: weāre calling create and instantiating a new writer after the other places that call `flush`, but not this one. Arguably we donāt need the variable then. But we should consider cleaning up the above comment and adding information about the `null` check being used as the guard here or where the check happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] lvyanquan commented on a diff in pull request #5662: Doc: Update doc to display the results of the table partitions query
lvyanquan commented on code in PR #5662: URL: https://github.com/apache/iceberg/pull/5662#discussion_r959213698 ## docs/spark-queries.md: ## @@ -318,12 +318,15 @@ To show a table's current partitions: SELECT * FROM prod.db.table.partitions ``` -| partition | record_count | file_count | -| -- | -- | -- | -| {20211001, 11}| 1| 1| -| {20211002, 11}| 1| 1| -| {20211001, 10}| 1| 1| -| {20211002, 10}| 1| 1| +| partition | record_count | file_count | spec_id | +| -- | -- | -- | -- | +| {20211001, 11}| 1| 1| 0| +| {20211002, 11}| 1| 1| 0| +| {20211001, 10}| 1| 1| 0| +| {20211002, 10}| 1| 1| 0| + +Note: +If this table is non-partitioned, the resultSet will contain record_count and file_count only. Review Comment: Thanks for your suggestion, addressed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
xuzhiwen1255 commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959214446 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: I understand. I'll go and fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] tongwei opened a new issue, #5677: Why locality is disable default when FileSystem scheme is not hdfs
tongwei opened a new issue, #5677: URL: https://github.com/apache/iceberg/issues/5677 ### Query engine spark ### Question When I test iceberg with alluxio and spark, I notice that locality is disable by default when FileSystem scheme is not hdfs. To enable this I can only use `spark.read.option("locality",true).table("table_name")` to make spark use locality. Why disable locality when FileSystem scheme is not hdfs. Can we enable this by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959214832 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: I agree that a method might be helpful for readability. `isClosed` seems odd to me personally as there is a `close` method on the class and we havenāt necessarily called it yet. Do you suggest adding `isClosed` to the writer itself? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] tongwei commented on issue #5677: Why locality is disable default when FileSystem scheme is not hdfs
tongwei commented on issue #5677: URL: https://github.com/apache/iceberg/issues/5677#issuecomment-1232531922 [  ](url) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959217458 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: If the writer class isnāt being touched (I havenāt looked that closely yet) or if the method will be placed on this class, Iād consider naming the method something like `shouldFlushWriter` or something. But it does seem that checking for null is technically sufficient overall. Thanks @xuzhiwen1255. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959219452 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: Also, if itās possible to add a test for this, that would be great! However, the test seems admittedly flaky by nature so if itās not consistently possible donāt worry too much about it. But looking into it would be great! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] kbendick commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
kbendick commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959219452 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: Also, if itās possible to add a test for this, that would be great! However, the test seems admittedly like it might be flaky by nature, so if itās not consistently possible to test for this behavior or a spy canāt be used or something, donāt worry too much about it. But looking into it would be great! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
xuzhiwen1255 commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959222044 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: It's me. I think null is enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [iceberg] xuzhiwen1255 commented on a diff in pull request #5642: Flink: Fixed an issue where Flink batch entry was not accurate
xuzhiwen1255 commented on code in PR #5642: URL: https://github.com/apache/iceberg/pull/5642#discussion_r959223612 ## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java: ## @@ -87,6 +89,7 @@ public void endInput() throws IOException { // remaining completed files to downstream before closing the writer so that we won't miss any // of them. flush(); +ended = true; Review Comment: The nice thing about adding a new method is that the code will look a little bit cleaner, and if we're adding a new method, we'll name it something that matches @kbendick -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org