Re: [PR] build(deps): bump github.com/apache/arrow/go/v16 from 16.0.0 to 16.1.0 [iceberg-go]

2024-06-24 Thread via GitHub


Fokko merged PR #97:
URL: https://github.com/apache/iceberg-go/pull/97


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] build(deps): bump github.com/hamba/avro/v2 from 2.21.1 to 2.22.1 [iceberg-go]

2024-06-24 Thread via GitHub


Fokko merged PR #94:
URL: https://github.com/apache/iceberg-go/pull/94


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] build(deps): bump github.com/pterm/pterm from 0.12.78 to 0.12.79 [iceberg-go]

2024-06-24 Thread via GitHub


Fokko merged PR #93:
URL: https://github.com/apache/iceberg-go/pull/93


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add the build from source section [iceberg-go]

2024-06-24 Thread via GitHub


Fokko merged PR #70:
URL: https://github.com/apache/iceberg-go/pull/70


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add the build from source section [iceberg-go]

2024-06-24 Thread via GitHub


Fokko commented on PR #70:
URL: https://github.com/apache/iceberg-go/pull/70#issuecomment-2185823387

   Thanks for the PR @git-hulk and @zeroshade  for the review! πŸ™Œ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Metadata Log Entries metadata table [iceberg-python]

2024-06-24 Thread via GitHub


HonahX commented on code in PR #667:
URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1650524201


##
pyiceberg/table/snapshots.py:
##
@@ -226,7 +226,8 @@ def __eq__(self, other: Any) -> bool:
 class Snapshot(IcebergBaseModel):
 snapshot_id: int = Field(alias="snapshot-id")
 parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", 
default=None)
-sequence_number: Optional[int] = Field(alias="sequence-number", 
default=None)
+# cannot import `INITIAL_SEQUENCE_NUMBER` due to circular import
+sequence_number: Optional[int] = Field(alias="sequence-number", default=0)

Review Comment:
   @kevinjqliu Thanks for spotting this! We definitely need to read 
snapshot.sequence_number as 0 for v1. However, as we have observed in the test 
outcome, making `sequence_number` default to 0 here leads to 
`sequence_number=0` be written to version 1 table metada's snapshots, which is 
not allowed by spec:
   ```
   Writing v1 metadata:
   Snapshot field sequence-number should not be written
   ```
   
   I think we may need a new 
[field_serializer](https://docs.pydantic.dev/latest/api/functional_serializers/#pydantic.functional_serializers.field_serializer)
 in `TableMetadataCommonFields` class or some other ways to correct the 
behavior on write. WDYT?



##
pyiceberg/table/__init__.py:
##
@@ -3827,6 +3827,40 @@ def _partition_summaries_to_rows(
 schema=manifest_schema,
 )
 
+def metadata_log_entries(self) -> "pa.Table":
+import pyarrow as pa
+
+from pyiceberg.table.snapshots import MetadataLogEntry
+
+table_schema = pa.schema([
+pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
+pa.field("file", pa.string(), nullable=False),
+pa.field("latest_snapshot_id", pa.int64(), nullable=True),
+pa.field("latest_schema_id", pa.int32(), nullable=True),
+pa.field("latest_sequence_number", pa.int64(), nullable=True),
+])
+
+def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> 
Dict[str, Any]:
+latest_snapshot = 
self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
+return {
+"timestamp": metadata_entry.timestamp_ms,
+"file": metadata_entry.metadata_file,
+"latest_snapshot_id": latest_snapshot.snapshot_id if 
latest_snapshot else None,
+"latest_schema_id": latest_snapshot.schema_id if 
latest_snapshot else None,
+"latest_sequence_number": latest_snapshot.sequence_number if 
latest_snapshot else None,
+}
+
+# imitates `addPreviousFile` from Java
+# 
https://github.com/apache/iceberg/blob/8248663a2a1ffddd2664ea37b45882455466f71c/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1450-L1451
+metadata_log_entries = self.tbl.metadata.metadata_log + [
+MetadataLogEntry(metadata_file=self.tbl.metadata_location, 
timestamp_ms=self.tbl.metadata.last_updated_ms)

Review Comment:
   It seems this line acts more like 
https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66.
   Just curious the reason that you mention `addPreviousFile` here, which seem 
to be more relevant when we update the metadata_log during table commit.
   
   BTW, this reminds me that currently non-rest catalog does not update the 
metadata_log field during commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Encryption integration and test [iceberg]

2024-06-24 Thread via GitHub


hsiang-c commented on code in PR #5544:
URL: https://github.com/apache/iceberg/pull/5544#discussion_r1650529269


##
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##
@@ -137,7 +163,47 @@ protected String tableName() {
 
   @Override
   public FileIO io() {
-return fileIO;
+if (encryptionManager == null) {
+  encryptionManager = encryption();
+}
+
+if (!encryptedTable) {
+  return fileIO;
+}
+
+if (encryptingFileIO != null) {
+  return encryptingFileIO;
+}
+
+encryptingFileIO = EncryptingFileIO.combine(fileIO, encryptionManager);
+return encryptingFileIO;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+if (encryptionManager != null) {
+  return encryptionManager;
+}
+
+if (encryptionKeyId == null) {
+  encryptionPropsFromHms();

Review Comment:
   This LGTM now.
   
   The previous approach (i.e. call the `dekLength()` in 
`BaseMetastoreTableOperations`) will only set `encryptionDekLength` on the 
write path (the `writeNewMetadataFile` method). Therefore, Hive Catalog readers 
get the default -1 value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat(exprs): Adding BooleanExpressions and Predicates [iceberg-go]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #91:
URL: https://github.com/apache/iceberg-go/pull/91#discussion_r1650517065


##
exprs.go:
##
@@ -0,0 +1,968 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+   "fmt"
+
+   "github.com/google/uuid"
+)
+
+//go:generate stringer -type=Operation -linecomment
+
+// Operation is an enum used for constants to define what operation a given
+// expression or predicate is going to execute.
+type Operation int
+
+const (
+   // do not change the order of these enum constants.
+   // they are grouped for quick validation of operation type by
+   // using <= and >= of the first/last operation in a group
+
+   OpTrue  Operation = iota // True
+   OpFalse  // False
+   // unary ops
+   OpIsNull  // IsNull
+   OpNotNull // NotNull
+   OpIsNan   // IsNaN
+   OpNotNan  // NotNaN
+   // literal ops
+   OpLT// LessThan
+   OpLTEQ  // LessThanEqual
+   OpGT// GreaterThan
+   OpGTEQ  // GreaterThanEqual
+   OpEQ// Equal
+   OpNEQ   // NotEqual
+   OpStartsWith// StartsWith
+   OpNotStartsWith // NotStartsWith
+   // set ops
+   OpIn// In
+   OpNotIn // NotIn
+   // boolean ops
+   OpNot // Not
+   OpAnd // And
+   OpOr  // Or
+)
+
+// Negate returns the inverse operation for a given op
+func (op Operation) Negate() Operation {
+   switch op {
+   case OpIsNull:
+   return OpNotNull
+   case OpNotNull:
+   return OpIsNull
+   case OpIsNan:
+   return OpNotNan
+   case OpNotNan:
+   return OpIsNan
+   case OpLT:
+   return OpGTEQ
+   case OpLTEQ:
+   return OpGT
+   case OpGT:
+   return OpLTEQ
+   case OpGTEQ:
+   return OpLT
+   case OpEQ:
+   return OpNEQ
+   case OpNEQ:
+   return OpEQ
+   case OpIn:
+   return OpNotIn
+   case OpNotIn:
+   return OpIn
+   case OpStartsWith:
+   return OpNotStartsWith
+   case OpNotStartsWith:
+   return OpStartsWith
+   default:
+   panic("no negation for operation " + op.String())
+   }
+}
+
+// FlipLR returns the correct operation to use if the left and right operands
+// are flipped.
+func (op Operation) FlipLR() Operation {
+   switch op {
+   case OpLT:
+   return OpGT
+   case OpLTEQ:
+   return OpGTEQ
+   case OpGT:
+   return OpLT
+   case OpGTEQ:
+   return OpLTEQ
+   case OpAnd:
+   return OpAnd
+   case OpOr:
+   return OpOr
+   default:
+   panic("no left-right flip for operation: " + op.String())
+   }
+}
+
+// BooleanExpression represents a full expression which will evaluate to a
+// boolean value such as GreaterThan or StartsWith, etc.
+type BooleanExpression interface {
+   fmt.Stringer
+   Op() Operation
+   Negate() BooleanExpression
+   Equals(BooleanExpression) bool
+}
+
+// AlwaysTrue is the boolean expression "True"
+type AlwaysTrue struct{}
+
+func (AlwaysTrue) String() string{ return "AlwaysTrue()" }
+func (AlwaysTrue) Op() Operation { return OpTrue }
+func (AlwaysTrue) Negate() BooleanExpression { return AlwaysFalse{} }
+func (AlwaysTrue) Equals(other BooleanExpression) bool {
+   _, ok := other.(AlwaysTrue)
+   return ok
+}
+
+// AlwaysFalse is the boolean expression "False"
+type AlwaysFalse struct{}
+
+func (AlwaysFalse) String() string{ return "AlwaysFalse()" }
+func (AlwaysFalse) Op() Operation { return OpFalse }
+func (AlwaysFalse) Negate() BooleanExpression { return AlwaysTrue{} }
+func (AlwaysFalse) Equals(other BooleanExpression) bool {
+   _, ok := other.(AlwaysFalse)
+   return ok
+}
+
+type NotExpr struct {
+   child BooleanExpression
+}
+
+// NewNot creates a BooleanExpression representing a "Not" operation on the 
given
+// argument. It will optimize slightly though:
+//
+// If t

Re: [PR] Build: Bump mkdocs-material from 9.5.26 to 9.5.27 [iceberg]

2024-06-24 Thread via GitHub


Fokko merged PR #10555:
URL: https://github.com/apache/iceberg/pull/10555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump software.amazon.awssdk:bom from 2.26.3 to 2.26.7 [iceberg]

2024-06-24 Thread via GitHub


Fokko merged PR #10554:
URL: https://github.com/apache/iceberg/pull/10554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump software.amazon.awssdk:bom from 2.26.3 to 2.26.7 [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on PR #10554:
URL: https://github.com/apache/iceberg/pull/10554#issuecomment-2185905602

   Thanks for the review @singhpk234 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump nessie from 0.90.4 to 0.91.1 [iceberg]

2024-06-24 Thread via GitHub


Fokko merged PR #10551:
URL: https://github.com/apache/iceberg/pull/10551


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump nessie from 0.90.4 to 0.91.1 [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on PR #10551:
URL: https://github.com/apache/iceberg/pull/10551#issuecomment-2185914294

   Thanks for reviewing @ajantha-bhat πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump org.roaringbitmap:RoaringBitmap from 1.0.6 to 1.1.0 [iceberg]

2024-06-24 Thread via GitHub


Fokko merged PR #10552:
URL: https://github.com/apache/iceberg/pull/10552


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynFields, DynMethods code cleanup [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #10543:
URL: https://github.com/apache/iceberg/pull/10543#discussion_r1650614392


##
common/src/main/java/org/apache/iceberg/common/DynMethods.java:
##
@@ -313,6 +317,8 @@ public Builder impl(Class targetClass, Class... 
argClasses) {
   return this;
 }
 
+/** @deprecated since 1.6.0, will be removed in 1.7.0 */
+@Deprecated

Review Comment:
   I think we can deprecate these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynFields, DynMethods code cleanup [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #10543:
URL: https://github.com/apache/iceberg/pull/10543#discussion_r1650614916


##
common/src/main/java/org/apache/iceberg/common/DynMethods.java:
##
@@ -161,6 +161,8 @@ private BoundMethod(UnboundMethod method, Object receiver) {
   this.receiver = receiver;
 }
 
+/** @deprecated since 1.6.0, will be removed in 1.7.0 */

Review Comment:
   I don't mind having a few additional methods around. They come in a pair of 
`{invoke, invokeChecked}`. It looks weird if they are sometime defined, and 
sometimes missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Build: Bump parquet from 1.13.1 to 1.14.1 [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on PR #10553:
URL: https://github.com/apache/iceberg/pull/10553#issuecomment-2185975514

   Blocked by the Gradle update πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[I] support bloom-filter writing [iceberg-python]

2024-06-24 Thread via GitHub


raphaelauv opened a new issue, #850:
URL: https://github.com/apache/iceberg-python/issues/850

   ### Feature Request / Improvement
   
   Hi, py-iceberg wannot yet write bloom-filter, would be a great feature, 
thanks all


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] [Bug]: Can't create/update tables in REST nessie catalog via Spark. Iceberg Cannot read field "formatVersion" because "x0" is null [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on issue #10533:
URL: https://github.com/apache/iceberg/issues/10533#issuecomment-2186001634

   @ajantha-bhat do you have time to dig into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650701857


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata 
metadata) {
 int nextVersion = (current.first() != null ? current.first() : 0) + 1;
 Path finalMetadataFile = metadataFilePath(nextVersion, codec);
 FileSystem fs = getFileSystem(tempMetadataFile, conf);
-
-// this rename operation is the atomic commit operation
-renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion);
-
-LOG.info("Committed a new metadata file {}", finalMetadataFile);
-
-// update the best-effort version pointer
-writeVersionHint(nextVersion);
-
-deleteRemovedMetadataFiles(base, metadata);
-
-this.shouldRefresh = true;
+boolean versionCommitSuccess = false;
+try {
+  fs.delete(versionHintFile(), false /* recursive delete*/);

Review Comment:
   > Sir, the purpose of versionHintFile is simply to speed up the reading of 
the latest version. itβ€˜s just a index file.
   
   That is correct, but it is also highly coupled to the Iceberg project since 
Iceberg optimizes for object stores. Object stores are terrible at listing 
files since they are often paged responses. These are both slow and costly, 
therefore the pointer in the `versionHintFile` makes this much faster.
   
   > In a file system that is basically posix compliant, we theoretically don't 
need lockManager to prevent multiple clients from committing concurrently, 
because fs.rename can do that.
   
   How would that work? I think a race condition cannot be avoided, where it 
would pass 99.99% of the time, but it can be that they overwrite each other 
because they don't know that multiple processes are writing to the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Api: Track partition statistics via TableMetadata [iceberg]

2024-06-24 Thread via GitHub


advancedxy commented on code in PR #8502:
URL: https://github.com/apache/iceberg/pull/8502#discussion_r1650705615


##
core/src/main/java/org/apache/iceberg/TableMetadataParser.java:
##
@@ -481,6 +488,13 @@ public static TableMetadata fromJson(String 
metadataLocation, JsonNode node) {
   statisticsFiles = ImmutableList.of();
 }
 
+List partitionStatisticsFiles;
+if (node.has(PARTITION_STATISTICS)) {
+  partitionStatisticsFiles = 
partitionStatsFilesFromJson(node.get(PARTITION_STATISTICS));
+} else {
+  partitionStatisticsFiles = ImmutableList.of();
+}
+

Review Comment:
   Hi @ajantha-bhat and @aokolnychyi, I have a question about this 
implementation as I'm exploring to add new fields into TableMetadata.  Suppose 
the table `db.table`'s partition stats is updated by the new version of Iceberg 
via UpdatePartitionStatistics. After that, some old version of Iceberg library 
or the PyIceberg client produces a new commit to this table. Per my 
understanding, that writer will produce TableMetadata without 
`PARTITION_STATISTICS` since it knows nothing about `PARTITION_STATISTICS`, 
which effectively loses that info for the table. 
   
   Do you have any solutions or ideas on how to prevent such cases? I can think 
of some potential ideas, such as:
   1. upgrade the format_version to a new one whenever we need to add new 
fields to table metadata, all the old clients will be rejected by the version 
check then.
   2. define a writer_version field, old client can read metadata produced by 
new client, but it will reject writers with old versions.
   3. move the check to the REST catalog service? 
   
   I feel it's too heavy  to do a format upgrade when only adding new fields in 
TableMetadata. 
   
   Do you have any other ideas? Really appreciate your inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Iceberg Spark streaming skips rows of data [iceberg]

2024-06-24 Thread via GitHub


cccs-jc commented on issue #10156:
URL: https://github.com/apache/iceberg/issues/10156#issuecomment-2186073673

   @singhpk234  any progress on this issue ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650711386


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {

Review Comment:
   If the locking is implemented correctly, then this should not happen if I 
follow the logical correctly. This operation will list the prefix, which can be 
costly:
   - Doing additional operations to the filesystem/object-store
   - Keeping the lock for a longer time than needed, slowing down the commit 
process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650714966


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+throw new CommitFailedException(
+"Cannot commit version [%d] because it is smaller or much larger 
than the current latest version [%d].Are there other clients running in 
parallel with the current task?",
+nextVersion, findVersionWithOutVersionHint(fs));
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, src, dst);
 } finally {
   if (!lockManager.release(dst.toString(), src.toString())) {
 LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
+  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+return Util.getFs(path, hadoopConf);
+  }
+
+  @VisibleForTesting
+  boolean checkMetaDataFileRenameSuccess(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws 
IOException {
+return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile);
+  }
+
+  @VisibleForTesting
+  boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path 
finalMetaDataFile)
+  throws IOException {
+return fs.rename(tempMetaDataFile, finalMetaDataFile);
+  }
+
+  private boolean renameCheck(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable 
rootError) {
 try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
+  return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, 
finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error(
+  "No correctness check can be performed.src=[{}],dst=[{}].",
+  tempMetaDataFile,
+  finalMetaDataFile,
+  e);
+  String msg =
+  String.format(
+  "Exception thrown when renaming [%s] to [%s].Also no correctness 
check can be performed.",
+  tempMetaDataFile, finalMetaDataFile);
+  throw new CommitStateUnknownException(msg, rootError != null ? rootError 
: e);
 }
   }
 
-  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
-return Util.getFs(path, hadoopConf);
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {

Review Comment:
   I think it is best to be more specific here:
   ```suggestion
   } catch (IOException e) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Issue with CALL parsing [iceberg]

2024-06-24 Thread via GitHub


qianzhen0 commented on issue #8343:
URL: https://github.com/apache/iceberg/issues/8343#issuecomment-2186085314

   in my case, adding 
`spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`
 solve the problem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650722647


##
core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java:
##
@@ -206,6 +210,139 @@ public void testFailedCommit() throws Exception {
 Assertions.assertThat(manifests).as("Should contain 0 Avro manifest 
files").isEmpty();
   }
 
+  @Test
+  public void testCommitFailedBeforeChangeVersionHint() throws IOException {
+table.newFastAppend().appendFile(FILE_A).commit();
+BaseTable baseTable = (BaseTable) table;
+HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
+
+HadoopTableOperations spyOps2 = spy(tableOperations);
+doReturn(1).when(spyOps2).findVersionWithOutVersionHint(any());
+TableMetadata metadataV1 = spyOps2.current();
+SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
+TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
+assertThatThrownBy(() -> spyOps2.commit(metadataV1, metadataV2))
+.isInstanceOf(CommitFailedException.class)
+.hasMessageContaining("Are there other clients running in parallel 
with the current task?");
+
+HadoopTableOperations spyOps3 = spy(tableOperations);
+doReturn(false).when(spyOps3).nextVersionIsLatest(any(), any());
+assertCommitNotChangeVersion(
+baseTable,
+spyOps3,
+CommitFailedException.class,
+"Are there other clients running in parallel with the current task?");
+
+HadoopTableOperations spyOps4 = spy(tableOperations);
+doThrow(new RuntimeException("FileSystem crash!"))
+.when(spyOps4)
+.renameMetaDataFileAndCheck(any(), any(), any());
+assertCommitNotChangeVersion(
+baseTable, spyOps4, CommitFailedException.class, "FileSystem crash!");
+  }
+
+  @Test
+  public void testCommitFailedAndCheckFailed() throws IOException {
+table.newFastAppend().appendFile(FILE_A).commit();
+BaseTable baseTable = (BaseTable) table;
+HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
+HadoopTableOperations spyOps = spy(tableOperations);
+doThrow(new RuntimeException("FileSystem crash!"))

Review Comment:
   I think `IOException`'s are more appropriate here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650728447


##
core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java:
##
@@ -206,6 +210,139 @@ public void testFailedCommit() throws Exception {
 Assertions.assertThat(manifests).as("Should contain 0 Avro manifest 
files").isEmpty();
   }
 
+  @Test
+  public void testCommitFailedBeforeChangeVersionHint() throws IOException {
+table.newFastAppend().appendFile(FILE_A).commit();
+BaseTable baseTable = (BaseTable) table;
+HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
+
+HadoopTableOperations spyOps2 = spy(tableOperations);
+doReturn(1).when(spyOps2).findVersionWithOutVersionHint(any());
+TableMetadata metadataV1 = spyOps2.current();
+SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
+TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
+assertThatThrownBy(() -> spyOps2.commit(metadataV1, metadataV2))
+.isInstanceOf(CommitFailedException.class)
+.hasMessageContaining("Are there other clients running in parallel 
with the current task?");
+
+HadoopTableOperations spyOps3 = spy(tableOperations);
+doReturn(false).when(spyOps3).nextVersionIsLatest(any(), any());
+assertCommitNotChangeVersion(
+baseTable,
+spyOps3,
+CommitFailedException.class,
+"Are there other clients running in parallel with the current task?");
+
+HadoopTableOperations spyOps4 = spy(tableOperations);
+doThrow(new RuntimeException("FileSystem crash!"))
+.when(spyOps4)
+.renameMetaDataFileAndCheck(any(), any(), any());
+assertCommitNotChangeVersion(
+baseTable, spyOps4, CommitFailedException.class, "FileSystem crash!");
+  }
+
+  @Test
+  public void testCommitFailedAndCheckFailed() throws IOException {
+table.newFastAppend().appendFile(FILE_A).commit();
+BaseTable baseTable = (BaseTable) table;
+HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
+HadoopTableOperations spyOps = spy(tableOperations);
+doThrow(new RuntimeException("FileSystem crash!"))

Review Comment:
   @Fokko Okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


Fokko commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650739534


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -355,71 +475,90 @@ int findVersion() {
* an attempt will be made to delete the source file.
*
* @param fs the filesystem used for the rename
-   * @param src the source file
-   * @param dst the destination file
+   * @param tempMetaDataFile the source file
+   * @param finalMetaDataFile the destination file
*/
-  private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
+  @VisibleForTesting
+  boolean commitNewVersion(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer 
nextVersion)
+  throws IOException {
 try {
-  if (!lockManager.acquire(dst.toString(), src.toString())) {
+  if (!lockManager.acquire(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
 throw new CommitFailedException(
-"Failed to acquire lock on file: %s with owner: %s", dst, src);
+"Failed to acquire lock on file: %s with owner: %s",
+finalMetaDataFile, tempMetaDataFile);
   }
 
-  if (fs.exists(dst)) {
-throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
-  }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
+  if (fs.exists(finalMetaDataFile)) {
+throw new CommitFailedException(
+"Version %d already exists: %s", nextVersion, finalMetaDataFile);
   }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  // maybe too heavy.?
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+// In the case of concurrent execution,
+// verify that the version that is ready to be committed at a time is 
the latest version.
+throw new CommitFailedException("Version %d too old: %s", nextVersion, 
finalMetaDataFile);
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, tempMetaDataFile, 
finalMetaDataFile);
 } finally {
-  if (!lockManager.release(dst.toString(), src.toString())) {
-LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+  if (!lockManager.release(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
+LOG.warn(
+"Failed to release lock on file: {} with owner: {}",
+finalMetaDataFile,
+tempMetaDataFile);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
-try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
-}
+  private void cleanUncommittedMeta(Path src) {
+io().deleteFile(src.toString());
   }
 
   protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
 return Util.getFs(path, hadoopConf);
   }
 
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  // The most important step. There must be no mistakes in this step.
+  // Even if it does, we should stop everything.
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error("There were some problems with submitting the new version.", 
e);
+  try {
+if (newMetadataExists(fs, finalMetaDataFile) && 
!tempMetadataExists(fs, tempMetaDataFile)) {
+  return true;
+} else {
+  throw new CommitFailedException(e);
+}
+  } catch (CommitFailedException e1) {
+throw e1;
+  } catch (Throwable e2) {
+throw new CommitStateUnknownException(e2);
+  }
+}
+  }
+
+  @VisibleForTesting

Review Comment:
   With OOP programming you don't want to assert the calls, but want to 
validate the behavior of the HadoopTableOperations. In order to do this you 
would create scenario's where you use Mockito to let certain operations fail 
(for example `fs.rename`), and then you assert the desired state. Opening up 
the object for testing is in general considered a bad practice.
   
   An example can be found here: 
https://github.com/apache/iceberg/blob/29fd2a0cb940fd16aa5bfb8003ec1354e678c949/core/src/test/java/org/apache/iceberg/hadoo

Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+throw new CommitFailedException(
+"Cannot commit version [%d] because it is smaller or much larger 
than the current latest version [%d].Are there other clients running in 
parallel with the current task?",
+nextVersion, findVersionWithOutVersionHint(fs));
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, src, dst);
 } finally {
   if (!lockManager.release(dst.toString(), src.toString())) {
 LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
+  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+return Util.getFs(path, hadoopConf);
+  }
+
+  @VisibleForTesting
+  boolean checkMetaDataFileRenameSuccess(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws 
IOException {
+return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile);
+  }
+
+  @VisibleForTesting
+  boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path 
finalMetaDataFile)
+  throws IOException {
+return fs.rename(tempMetaDataFile, finalMetaDataFile);
+  }
+
+  private boolean renameCheck(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable 
rootError) {
 try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
+  return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, 
finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error(
+  "No correctness check can be performed.src=[{}],dst=[{}].",
+  tempMetaDataFile,
+  finalMetaDataFile,
+  e);
+  String msg =
+  String.format(
+  "Exception thrown when renaming [%s] to [%s].Also no correctness 
check can be performed.",
+  tempMetaDataFile, finalMetaDataFile);
+  throw new CommitStateUnknownException(msg, rootError != null ? rootError 
: e);
 }
   }
 
-  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
-return Util.getFs(path, hadoopConf);
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {

Review Comment:
   > I think it is best to be more specific here:
   @Fokko 
   This is a crucial step. I think we should enlarge the scope of the exception 
detection. This is because at this point we may encounter OOM exceptions and so 
on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+throw new CommitFailedException(
+"Cannot commit version [%d] because it is smaller or much larger 
than the current latest version [%d].Are there other clients running in 
parallel with the current task?",
+nextVersion, findVersionWithOutVersionHint(fs));
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, src, dst);
 } finally {
   if (!lockManager.release(dst.toString(), src.toString())) {
 LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
+  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+return Util.getFs(path, hadoopConf);
+  }
+
+  @VisibleForTesting
+  boolean checkMetaDataFileRenameSuccess(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws 
IOException {
+return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile);
+  }
+
+  @VisibleForTesting
+  boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path 
finalMetaDataFile)
+  throws IOException {
+return fs.rename(tempMetaDataFile, finalMetaDataFile);
+  }
+
+  private boolean renameCheck(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable 
rootError) {
 try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
+  return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, 
finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error(
+  "No correctness check can be performed.src=[{}],dst=[{}].",
+  tempMetaDataFile,
+  finalMetaDataFile,
+  e);
+  String msg =
+  String.format(
+  "Exception thrown when renaming [%s] to [%s].Also no correctness 
check can be performed.",
+  tempMetaDataFile, finalMetaDataFile);
+  throw new CommitStateUnknownException(msg, rootError != null ? rootError 
: e);
 }
   }
 
-  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
-return Util.getFs(path, hadoopConf);
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {

Review Comment:
   > I think it is best to be more specific here:
   
   
   @Fokko 
   This is a crucial step. I think we should enlarge the scope of the exception 
detection. This is because at this point we may encounter OOM exceptions and so 
on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+throw new CommitFailedException(
+"Cannot commit version [%d] because it is smaller or much larger 
than the current latest version [%d].Are there other clients running in 
parallel with the current task?",
+nextVersion, findVersionWithOutVersionHint(fs));
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, src, dst);
 } finally {
   if (!lockManager.release(dst.toString(), src.toString())) {
 LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
+  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+return Util.getFs(path, hadoopConf);
+  }
+
+  @VisibleForTesting
+  boolean checkMetaDataFileRenameSuccess(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws 
IOException {
+return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile);
+  }
+
+  @VisibleForTesting
+  boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path 
finalMetaDataFile)
+  throws IOException {
+return fs.rename(tempMetaDataFile, finalMetaDataFile);
+  }
+
+  private boolean renameCheck(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable 
rootError) {
 try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
+  return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, 
finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error(
+  "No correctness check can be performed.src=[{}],dst=[{}].",
+  tempMetaDataFile,
+  finalMetaDataFile,
+  e);
+  String msg =
+  String.format(
+  "Exception thrown when renaming [%s] to [%s].Also no correctness 
check can be performed.",
+  tempMetaDataFile, finalMetaDataFile);
+  throw new CommitStateUnknownException(msg, rootError != null ? rootError 
: e);
 }
   }
 
-  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
-return Util.getFs(path, hadoopConf);
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {

Review Comment:
   > I think it is best to be more specific here:
   
   
   @Fokko 
   This is a crucial step. I think we should enlarge the scope of the exception 
detection. This is because at this point we may encounter OOM exceptions and so 
on.
   Detecting only IO exceptions will miss boundary cases such as runtime 
exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+throw new CommitFailedException(
+"Cannot commit version [%d] because it is smaller or much larger 
than the current latest version [%d].Are there other clients running in 
parallel with the current task?",
+nextVersion, findVersionWithOutVersionHint(fs));
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, src, dst);
 } finally {
   if (!lockManager.release(dst.toString(), src.toString())) {
 LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
+  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+return Util.getFs(path, hadoopConf);
+  }
+
+  @VisibleForTesting
+  boolean checkMetaDataFileRenameSuccess(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws 
IOException {
+return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile);
+  }
+
+  @VisibleForTesting
+  boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path 
finalMetaDataFile)
+  throws IOException {
+return fs.rename(tempMetaDataFile, finalMetaDataFile);
+  }
+
+  private boolean renameCheck(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable 
rootError) {
 try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
+  return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, 
finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error(
+  "No correctness check can be performed.src=[{}],dst=[{}].",
+  tempMetaDataFile,
+  finalMetaDataFile,
+  e);
+  String msg =
+  String.format(
+  "Exception thrown when renaming [%s] to [%s].Also no correctness 
check can be performed.",
+  tempMetaDataFile, finalMetaDataFile);
+  throw new CommitStateUnknownException(msg, rootError != null ? rootError 
: e);
 }
   }
 
-  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
-return Util.getFs(path, hadoopConf);
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {

Review Comment:
   > I think it is best to be more specific here:
   
   
   @Fokko 
   This is a crucial step. I think we should enlarge the scope of the exception 
detection. This is because at this point we may encounter OOM exceptions and so 
on.
   Detecting only IO exceptions will miss boundary cases such as runtime 
exceptions.
   Our intention is that whatever we encounter in this step, we can throw an 
CommitStateUnknownException to ensure that we don't trigger other actions that 
affect the correctness of the data, such as file cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] [Bug]: Can't create/update tables in REST nessie catalog via Spark. Iceberg Cannot read field "formatVersion" because "x0" is null [iceberg]

2024-06-24 Thread via GitHub


ajantha-bhat commented on issue #10533:
URL: https://github.com/apache/iceberg/issues/10533#issuecomment-2186193549

   yeah. We will check it. 
   
   cc: @dimas-b, @snazy


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650769239


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata 
metadata) {
 int nextVersion = (current.first() != null ? current.first() : 0) + 1;
 Path finalMetadataFile = metadataFilePath(nextVersion, codec);
 FileSystem fs = getFileSystem(tempMetadataFile, conf);
-
-// this rename operation is the atomic commit operation
-renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion);
-
-LOG.info("Committed a new metadata file {}", finalMetadataFile);
-
-// update the best-effort version pointer
-writeVersionHint(nextVersion);
-
-deleteRemovedMetadataFiles(base, metadata);
-
-this.shouldRefresh = true;
+boolean versionCommitSuccess = false;
+try {
+  fs.delete(versionHintFile(), false /* recursive delete*/);

Review Comment:
   @Fokko 
   >How would that work? I think a race condition cannot be avoided, where it 
would pass 99.99% of the time, but it can be that they overwrite each other 
because they don't know that multiple processes are writing to the table.
   
   For filesystems that support atomic operations, rename operations do not 
overwrite each other. On the other hand, this operation is not supported in 
object storage. Due to the atomic nature of the operation, each client will 
know if its operation succeeded or not when it renames.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650772614


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata 
metadata) {
 int nextVersion = (current.first() != null ? current.first() : 0) + 1;
 Path finalMetadataFile = metadataFilePath(nextVersion, codec);
 FileSystem fs = getFileSystem(tempMetadataFile, conf);
-
-// this rename operation is the atomic commit operation
-renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion);
-
-LOG.info("Committed a new metadata file {}", finalMetadataFile);
-
-// update the best-effort version pointer
-writeVersionHint(nextVersion);
-
-deleteRemovedMetadataFiles(base, metadata);
-
-this.shouldRefresh = true;
+boolean versionCommitSuccess = false;
+try {
+  fs.delete(versionHintFile(), false /* recursive delete*/);

Review Comment:
   > That is correct, but it is also highly coupled to the Iceberg project 
since Iceberg optimizes for object stores. Object stores are terrible at 
listing files since they are often paged responses. These are both slow and 
costly, therefore the pointer in the `versionHintFile` makes this much faster.
   
   @Fokko 
   For object storage, since the current implementation of fileSystemCatalog is 
based on the assumption that the file system supports atomic operations, the 
current implementation can not be used for object storage, either we provide 
another implementation, or we use some CNCF's middleware to proxy, which 
usually can provide atomic operation features.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650772614


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata 
metadata) {
 int nextVersion = (current.first() != null ? current.first() : 0) + 1;
 Path finalMetadataFile = metadataFilePath(nextVersion, codec);
 FileSystem fs = getFileSystem(tempMetadataFile, conf);
-
-// this rename operation is the atomic commit operation
-renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion);
-
-LOG.info("Committed a new metadata file {}", finalMetadataFile);
-
-// update the best-effort version pointer
-writeVersionHint(nextVersion);
-
-deleteRemovedMetadataFiles(base, metadata);
-
-this.shouldRefresh = true;
+boolean versionCommitSuccess = false;
+try {
+  fs.delete(versionHintFile(), false /* recursive delete*/);

Review Comment:
   > That is correct, but it is also highly coupled to the Iceberg project 
since Iceberg optimizes for object stores. Object stores are terrible at 
listing files since they are often paged responses. These are both slow and 
costly, therefore the pointer in the `versionHintFile` makes this much faster.
   
   @Fokko 
   For object storage, since the current implementation of fileSystemCatalog is 
based on the assumption that the file system supports atomic operations, the 
current implementation can not be used for object storage, either we provide 
another implementation, or we use some CNCF's middleware to proxy, which 
usually can provide atomic operation features. Example: cubefs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix duplicate data in Flink's upsert writer for format V2 [iceberg]

2024-06-24 Thread via GitHub


pvary commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650849668


##
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##
@@ -426,30 +425,44 @@ private void commitOperation(
   }
 
   @Override
-  public void processElement(StreamRecord element) {
-this.writeResultsOfCurrentCkpt.add(element.getValue());
+  public void processElement(StreamRecord element) {
+FlinkWriteResult flinkWriteResult = element.getValue();
+List writeResults =
+writeResultsSinceLastSnapshot.computeIfAbsent(
+flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+writeResults.add(flinkWriteResult.writeResult());
   }
 
   @Override
   public void endInput() throws IOException {
 // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-long currentCheckpointId = Long.MAX_VALUE;
-dataFilesPerCheckpoint.put(currentCheckpointId, 
writeToManifest(currentCheckpointId));
-writeResultsOfCurrentCkpt.clear();
-
+long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+writeToManifestSinceLastSnapshot(currentCheckpointId);
 commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, 
currentCheckpointId);
   }
 
+  private void writeToManifestSinceLastSnapshot(long checkpointId) throws 
IOException {
+if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+  dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
+}
+
+for (Map.Entry> writeResultsOfCkpt :
+writeResultsSinceLastSnapshot.entrySet()) {
+  dataFilesPerCheckpoint.put(

Review Comment:
   Is this a scenario which could happen?
   - Writer 1 writes data
   - Writer 2 writes data
   - Writer 1 prepareSnapshotPreBarrier for Checkpoint 1, File 1-1 with 
checkpoint 1 created
   - Committer receives File 1-1
   - Writer 2 prepareSnapshotPreBarrier for Checkpoint 1, File 1-2 with 
checkpoint 1 created
   - Committer receives File 1-2
   - Writer 1 writes data
   - Writer 2 writes data
   - Writer 1 prepareSnapshotPreBarrier for Checkpoint 2, File 2-1 with 
checkpoint 2 created
   - Committer receives File 2-1
   - Committer starts snapshotState for Checkpoint 1 - At this stage we have 
`writeResultsSinceLastSnapshot` with more data, but not all of them are ready 
to commit. 
   - Committer receives File 2-1
   - Committer starts snapshotState for Checkpoint 2
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix duplicate data in Flink's upsert writer for format V2 [iceberg]

2024-06-24 Thread via GitHub


pvary commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650851730


##
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##
@@ -426,30 +425,44 @@ private void commitOperation(
   }
 
   @Override
-  public void processElement(StreamRecord element) {
-this.writeResultsOfCurrentCkpt.add(element.getValue());
+  public void processElement(StreamRecord element) {
+FlinkWriteResult flinkWriteResult = element.getValue();
+List writeResults =
+writeResultsSinceLastSnapshot.computeIfAbsent(
+flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+writeResults.add(flinkWriteResult.writeResult());
   }
 
   @Override
   public void endInput() throws IOException {
 // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-long currentCheckpointId = Long.MAX_VALUE;
-dataFilesPerCheckpoint.put(currentCheckpointId, 
writeToManifest(currentCheckpointId));
-writeResultsOfCurrentCkpt.clear();
-
+long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+writeToManifestSinceLastSnapshot(currentCheckpointId);
 commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, 
currentCheckpointId);
   }
 
+  private void writeToManifestSinceLastSnapshot(long checkpointId) throws 
IOException {
+if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {

Review Comment:
   What if we don't have data for the last checkpoint, but we have data for the 
previous checkpoints?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[PR] Refactored data clumps with the help of LLMs (research project) [iceberg]

2024-06-24 Thread via GitHub


compf opened a new pull request, #10558:
URL: https://github.com/apache/iceberg/pull/10558

   
   Hello maintainers,
   
   I am conducting a master thesis project focused on enhancing code quality 
through automated refactoring of data clumps, assisted by Large Language Models 
(LLMs).
   
   
   
 Data clump definition
 
   A data clump exists if
   1. two methods (in the same or in different classes) have at least 3 common 
parameters and one of those methods does not override the other,  or 
   2. At least three fields in a class are common with the parameters of a 
method (in the same or in a different class), or
   3. Two different classes have at least three common fields
 
   See also the following UML diagram as an example
   ![Example data 
clump](https://raw.githubusercontent.com/compf/data_clump_eval_assets/main/data_clump_explained.svg)
   
   
   
   I believe these refactoring can contribute to the project by reducing 
complexity and enhancing readability of your source code.
   
   Pursuant to the EU AI Act, I fully disclose the use of LLMs in generating 
these refactorings, emphasizing that all changes have undergone human review 
for quality assurance. 
   
   
   Even if you decide not to integrate my changes to your codebase (which is 
perfectly fine), I ask you to fill out a feedback survey, which will be 
scientifically evaluated to determine the acceptance of AI-supported 
refactorings. You can find the feedback survey under 
https://campus.lamapoll.de/Data-clump-refactoring/en
   
   
   Thank you for considering my contribution. I look forward to your feedback. 
If you have any other questions or comments, feel free to write a comment, or 
email me under tschoema...@uni-osnabrueck.de .
   
   
   Best regards,
   Timo Schoemaker
   Department of Computer Science
   University of OsnabrΓΌck
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] refactor: Remove dependency of tokio. [iceberg-rust]

2024-06-24 Thread via GitHub


Xuanwo commented on issue #418:
URL: https://github.com/apache/iceberg-rust/issues/418#issuecomment-2186329846

   Let's me do this. I feel like it's a good win for this to remove direct dep 
on tokio.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix duplicate data in Flink's upsert writer for format V2 [iceberg]

2024-06-24 Thread via GitHub


zhongqishang commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650864804


##
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##
@@ -426,30 +425,44 @@ private void commitOperation(
   }
 
   @Override
-  public void processElement(StreamRecord element) {
-this.writeResultsOfCurrentCkpt.add(element.getValue());
+  public void processElement(StreamRecord element) {
+FlinkWriteResult flinkWriteResult = element.getValue();
+List writeResults =
+writeResultsSinceLastSnapshot.computeIfAbsent(
+flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+writeResults.add(flinkWriteResult.writeResult());
   }
 
   @Override
   public void endInput() throws IOException {
 // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-long currentCheckpointId = Long.MAX_VALUE;
-dataFilesPerCheckpoint.put(currentCheckpointId, 
writeToManifest(currentCheckpointId));
-writeResultsOfCurrentCkpt.clear();
-
+long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+writeToManifestSinceLastSnapshot(currentCheckpointId);
 commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, 
currentCheckpointId);
   }
 
+  private void writeToManifestSinceLastSnapshot(long checkpointId) throws 
IOException {
+if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {

Review Comment:
   The last checkpoint will be set to `EMPTY_MANIFEST_DATA`, and the next logic 
will be executed, looping `writeResultsSinceLastSnapshot` to generate manifest 
data corresponding to the checkpointid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynFields, DynMethods code cleanup [iceberg]

2024-06-24 Thread via GitHub


findepi commented on code in PR #10543:
URL: https://github.com/apache/iceberg/pull/10543#discussion_r1650894264


##
common/src/main/java/org/apache/iceberg/common/DynMethods.java:
##
@@ -161,6 +161,8 @@ private BoundMethod(UnboundMethod method, Object receiver) {
   this.receiver = receiver;
 }
 
+/** @deprecated since 1.6.0, will be removed in 1.7.0 */

Review Comment:
   My thinking is that any variant needed in the future is very easy to add 
when needed and doesn't necessitate keeping current-dead-code around. Removing 
dead code clears the picture and open eyes on further simplifications.  Does 
this make sense? BTW recently, i was starring at a pretty complex class that 
was negatively impacted by some other changes only to eventually realize it's 
used only in its own tests and can simply be deleted. What a relief. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Improve MetricsReporter loading with class loader fallback [iceberg]

2024-06-24 Thread via GitHub


findepi commented on code in PR #10459:
URL: https://github.com/apache/iceberg/pull/10459#discussion_r1650899387


##
core/src/test/resources/com/example/target/custom-metrics-reporter-1.0-SNAPSHOT.jar:
##


Review Comment:
   You can create a new classloader with e.g. `new URLClassLoader` and this 
doesn't require adding a binary jar to the repo. You can follow the example of 
https://github.com/trinodb/trino/blob/c0276cea688da155f7f5bf0cb0b53169ece43e6e/core/trino-main/src/main/java/io/trino/sql/gen/IsolatedClass.java#L32
 to see e.g. how you can populate such a classloader with a class that you 
compiled normally and is part of your classpath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Race condition in CachingCatalog [iceberg]

2024-06-24 Thread via GitHub


findepi commented on issue #10493:
URL: https://github.com/apache/iceberg/issues/10493#issuecomment-2186420099

   @singhpk234 indeed this would alleviate the concern, but I don't see a way 
to implement caching in `Tabble loadTable(TableIdentifier)` method. We don't 
have the snapshot ID yet, and once we have it (delegate load), there is no way 
to use the cache.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Add configurable cache expiry to caching catalog [iceberg]

2024-06-24 Thread via GitHub


findepi commented on code in PR #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r1650929394


##
core/src/main/java/org/apache/iceberg/CachingCatalog.java:
##
@@ -29,24 +34,91 @@
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps an Iceberg Catalog to cache tables.
+ * 
+ * See {@link CatalogProperties#CACHE_EXPIRATION_INTERVAL_MS} for more details
+ * regarding special values for {@code expirationIntervalMillis}.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener 
identLoggingRemovalListener =
+  (key, value, cause) -> LOG.debug("Evicted {} from the table cache ({})", 
key, cause);
+
   public static Catalog wrap(Catalog catalog) {
-return wrap(catalog, true);
+return wrap(catalog, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+return wrap(catalog, true, expirationIntervalMillis);
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
+Preconditions.checkArgument(expirationIntervalMillis != 0,
+"When %s is set to 0, the catalog cache should be disabled. This 
indicates a bug.",
+CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
 this.catalog = catalog;
 this.caseSensitive = caseSensitive;
+this.expirationIntervalMillis = expirationIntervalMillis;
+this.tableCache = createTableCache(ticker);
+  }
+
+  /**
+   * CacheWriter class for removing metadata tables when their associated data 
table is expired
+   * via cache expiration.
+   */
+  class MetadataTableInvalidatingCacheWriter implements 
CacheWriter {
+@Override
+public void write(TableIdentifier tableIdentifier, Table table) {
+}
+
+@Override
+public void delete(TableIdentifier tableIdentifier, Table table, 
RemovalCause cause) {
+  if (RemovalCause.EXPIRED.equals(cause)) {
+if (!MetadataTableUtils.hasMetadataTableName(tableIdentifier)) {
+  tableCache.invalidateAll(metadataTableIdentifiers(tableIdentifier));
+}

Review Comment:
   This obviously races with hash puts done in 
`org.apache.iceberg.CachingCatalog#loadTable`.
   Would be great to have a code comment instructing the reader why it's not a 
problem.
   Is the thinking that this is not required for correctness and therefore does 
not have to be 100% accurate?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] AWS: Add FileIO tracker/closer to Glue catalog [iceberg]

2024-06-24 Thread via GitHub


findepi commented on code in PR #8315:
URL: https://github.com/apache/iceberg/pull/8315#discussion_r1650952000


##
aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java:
##
@@ -619,4 +635,17 @@ public void setConf(Configuration conf) {
   protected Map properties() {
 return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
   }
+
+  private Cache newFileIOCloser() {
+return Caffeine.newBuilder()
+.weakKeys()
+.removalListener(
+(RemovalListener)
+(ops, fileIO, cause) -> {
+  if (null != fileIO) {
+fileIO.close();

Review Comment:
   This is not a regular cache, this could be better served with 
`java.lang.ref.Cleaner`.
   However, none of these addresses the main problem that we use Java GC as a 
proxy to discover and close resource leaks. I understand that fileIO instances 
can hold on to resources that require closing (like file descriptors or sockets 
inside AWS S3 client). If i have abundance of memory, the weakly held instances 
may not get collected, so the app can run out of descriptors. Can you please 
help me understand why this isn't a concern?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynConstructors cleanup [iceberg]

2024-06-24 Thread via GitHub


nastra commented on code in PR #10542:
URL: https://github.com/apache/iceberg/pull/10542#discussion_r1650975795


##
common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+public class TestDynConstructors {
+  @Test
+  public void testImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder().impl(MyClass.class).buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+.impl("org.apache.iceberg.common.TestDynConstructors$MyClass")
+.buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceWrongImplString() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+// TODO this should throw, since the MyUnrelatedClass does not 
implement MyInterface

Review Comment:
   still a TODO left here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynConstructors cleanup [iceberg]

2024-06-24 Thread via GitHub


nastra commented on code in PR #10542:
URL: https://github.com/apache/iceberg/pull/10542#discussion_r1650976468


##
common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+public class TestDynConstructors {
+  @Test
+  public void testImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder().impl(MyClass.class).buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+.impl("org.apache.iceberg.common.TestDynConstructors$MyClass")
+.buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceWrongImplString() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+// TODO this should throw, since the MyUnrelatedClass does not 
implement MyInterface
+
.impl("org.apache.iceberg.common.TestDynConstructors$MyUnrelatedClass")
+.buildChecked();
+assertThatThrownBy(ctor::newInstance)
+.isInstanceOf(ClassCastException.class)
+.hasMessage(
+"org.apache.iceberg.common.TestDynConstructors$MyUnrelatedClass 
cannot be cast to org.apache.iceberg.common.TestDynConstructors$MyInterface");
+  }
+
+  @Test
+  public void testInterfaceWrongImplClass() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+// TODO this should throw or not compile at all, since the 
MyUnrelatedClass does not

Review Comment:
   TODO left here. Are you planning to address this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynConstructors cleanup [iceberg]

2024-06-24 Thread via GitHub


findepi commented on code in PR #10542:
URL: https://github.com/apache/iceberg/pull/10542#discussion_r1651007712


##
common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+public class TestDynConstructors {
+  @Test
+  public void testImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder().impl(MyClass.class).buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+.impl("org.apache.iceberg.common.TestDynConstructors$MyClass")
+.buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceWrongImplString() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+// TODO this should throw, since the MyUnrelatedClass does not 
implement MyInterface

Review Comment:
   This is reflection on current state of the API, pre-existing problem.
   I can remove this TODO if this helps. I did not plan to address this as part 
of this cleanup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [I] Field comments are not written for timestamp field [iceberg]

2024-06-24 Thread via GitHub


ariksa1 commented on issue #4212:
URL: https://github.com/apache/iceberg/issues/4212#issuecomment-2186546553

   Hi, there is any update regarding the issue? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] DynConstructors cleanup [iceberg]

2024-06-24 Thread via GitHub


findepi commented on code in PR #10542:
URL: https://github.com/apache/iceberg/pull/10542#discussion_r1651013839


##
common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+public class TestDynConstructors {
+  @Test
+  public void testImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder().impl(MyClass.class).buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceImplNewInstance() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+.impl("org.apache.iceberg.common.TestDynConstructors$MyClass")
+.buildChecked();
+assertThat(ctor.newInstance()).isInstanceOf(MyClass.class);
+  }
+
+  @Test
+  public void testInterfaceWrongImplString() throws Exception {
+DynConstructors.Ctor ctor =
+DynConstructors.builder(MyInterface.class)
+// TODO this should throw, since the MyUnrelatedClass does not 
implement MyInterface

Review Comment:
   BTW my thinking about TODO in the test is this -- a test should prevent 
undesired behavioral changes of the implementation. If i write a test 
exercising specific behavior and I don't think the behavior change should be 
prevented I put a comment like this one. If I don't put the comment in, someone 
may treat this test as asserting something important and thus give up an 
otherwise good change.
   I understand this is a bit convoluted, but that's a lesson learned from 
Trino on how contributors sometimes react to test failures. I've seen test 
failures causing otherwise good code to become less good, because the 
contributor considered modifying the test is not an option.
   
   Alternatively, I could not have the test, but I am not convinced this is a 
better option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651047933


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+throw new CommitFailedException(
+"Cannot commit version [%d] because it is smaller or much larger 
than the current latest version [%d].Are there other clients running in 
parallel with the current task?",
+nextVersion, findVersionWithOutVersionHint(fs));
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, src, dst);
 } finally {
   if (!lockManager.release(dst.toString(), src.toString())) {
 LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
+  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+return Util.getFs(path, hadoopConf);
+  }
+
+  @VisibleForTesting
+  boolean checkMetaDataFileRenameSuccess(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws 
IOException {
+return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile);
+  }
+
+  @VisibleForTesting
+  boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path 
finalMetaDataFile)
+  throws IOException {
+return fs.rename(tempMetaDataFile, finalMetaDataFile);
+  }
+
+  private boolean renameCheck(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable 
rootError) {
 try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
+  return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, 
finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error(
+  "No correctness check can be performed.src=[{}],dst=[{}].",
+  tempMetaDataFile,
+  finalMetaDataFile,
+  e);
+  String msg =
+  String.format(
+  "Exception thrown when renaming [%s] to [%s].Also no correctness 
check can be performed.",
+  tempMetaDataFile, finalMetaDataFile);
+  throw new CommitStateUnknownException(msg, rootError != null ? rootError 
: e);
 }
   }
 
-  protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
-return Util.getFs(path, hadoopConf);
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {

Review Comment:
   @Fokko Sir, thanks for pointing out the problem here, I've made this part of 
the exception handling a bit more detailed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651072649


##
core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java:
##
@@ -206,6 +210,139 @@ public void testFailedCommit() throws Exception {
 Assertions.assertThat(manifests).as("Should contain 0 Avro manifest 
files").isEmpty();
   }
 
+  @Test
+  public void testCommitFailedBeforeChangeVersionHint() throws IOException {
+table.newFastAppend().appendFile(FILE_A).commit();
+BaseTable baseTable = (BaseTable) table;
+HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
+
+HadoopTableOperations spyOps2 = spy(tableOperations);
+doReturn(1).when(spyOps2).findVersionWithOutVersionHint(any());
+TableMetadata metadataV1 = spyOps2.current();
+SortOrder dataSort = 
SortOrder.builderFor(baseTable.schema()).asc("data").build();
+TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort);
+assertThatThrownBy(() -> spyOps2.commit(metadataV1, metadataV2))
+.isInstanceOf(CommitFailedException.class)
+.hasMessageContaining("Are there other clients running in parallel 
with the current task?");
+
+HadoopTableOperations spyOps3 = spy(tableOperations);
+doReturn(false).when(spyOps3).nextVersionIsLatest(any(), any());
+assertCommitNotChangeVersion(
+baseTable,
+spyOps3,
+CommitFailedException.class,
+"Are there other clients running in parallel with the current task?");
+
+HadoopTableOperations spyOps4 = spy(tableOperations);
+doThrow(new RuntimeException("FileSystem crash!"))
+.when(spyOps4)
+.renameMetaDataFileAndCheck(any(), any(), any());
+assertCommitNotChangeVersion(
+baseTable, spyOps4, CommitFailedException.class, "FileSystem crash!");
+  }
+
+  @Test
+  public void testCommitFailedAndCheckFailed() throws IOException {
+table.newFastAppend().appendFile(FILE_A).commit();
+BaseTable baseTable = (BaseTable) table;
+HadoopTableOperations tableOperations = (HadoopTableOperations) 
baseTable.operations();
+HadoopTableOperations spyOps = spy(tableOperations);
+doThrow(new RuntimeException("FileSystem crash!"))

Review Comment:
   @Fokko done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651082514


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {

Review Comment:
   @Fokko  For object stores, fs.rename may leave an incomplete dst file. This 
is because many object stores implement the rename operation as write(). As a 
result, all of the logic we have implemented in hadooptableOptions is not 
really applicable to object stores. 
   For filesystems that support atomic operations, the list operation should 
take an acceptable amount of time if the number of metadata files is not very 
large.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651099384


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {

Review Comment:
   @Fokko 
   Also, this check I admit does not solve the problem in all scenarios at the 
moment, a better approach would be to call fs.rename successfully and then 
check that it's commits are normal. This check just intended to fail the client 
as soon as possible. Because there are a lot of clients running 
flink/kafka-stream to write to flink at the same time they are running the 
compaction task. Implementing fast failures is more friendly to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651106651


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -355,71 +475,90 @@ int findVersion() {
* an attempt will be made to delete the source file.
*
* @param fs the filesystem used for the rename
-   * @param src the source file
-   * @param dst the destination file
+   * @param tempMetaDataFile the source file
+   * @param finalMetaDataFile the destination file
*/
-  private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
+  @VisibleForTesting
+  boolean commitNewVersion(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer 
nextVersion)
+  throws IOException {
 try {
-  if (!lockManager.acquire(dst.toString(), src.toString())) {
+  if (!lockManager.acquire(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
 throw new CommitFailedException(
-"Failed to acquire lock on file: %s with owner: %s", dst, src);
+"Failed to acquire lock on file: %s with owner: %s",
+finalMetaDataFile, tempMetaDataFile);
   }
 
-  if (fs.exists(dst)) {
-throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
-  }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
+  if (fs.exists(finalMetaDataFile)) {
+throw new CommitFailedException(
+"Version %d already exists: %s", nextVersion, finalMetaDataFile);
   }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  // maybe too heavy.?
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+// In the case of concurrent execution,
+// verify that the version that is ready to be committed at a time is 
the latest version.
+throw new CommitFailedException("Version %d too old: %s", nextVersion, 
finalMetaDataFile);
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, tempMetaDataFile, 
finalMetaDataFile);
 } finally {
-  if (!lockManager.release(dst.toString(), src.toString())) {
-LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+  if (!lockManager.release(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
+LOG.warn(
+"Failed to release lock on file: {} with owner: {}",
+finalMetaDataFile,
+tempMetaDataFile);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
-try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
-}
+  private void cleanUncommittedMeta(Path src) {
+io().deleteFile(src.toString());
   }
 
   protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
 return Util.getFs(path, hadoopConf);
   }
 
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  // The most important step. There must be no mistakes in this step.
+  // Even if it does, we should stop everything.
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error("There were some problems with submitting the new version.", 
e);
+  try {
+if (newMetadataExists(fs, finalMetaDataFile) && 
!tempMetadataExists(fs, tempMetaDataFile)) {
+  return true;
+} else {
+  throw new CommitFailedException(e);
+}
+  } catch (CommitFailedException e1) {
+throw e1;
+  } catch (Throwable e2) {
+throw new CommitStateUnknownException(e2);
+  }
+}
+  }
+
+  @VisibleForTesting

Review Comment:
   @Fokko 
   Yes, but in practice many detailed scenarios are difficult to create 
locally. .. It is difficult to simulate such scenarios. For example: 
halfway through a method call, simulating all accesses to the filesystem 
suddenly throws an exception. I don't know how to simulate this scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.ap

Re: [PR] Flink: Maintenance - TriggerManager [iceberg]

2024-06-24 Thread via GitHub


stevenzwu commented on code in PR #10484:
URL: https://github.com/apache/iceberg/pull/10484#discussion_r1648221004


##
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JVMBasedLockFactory.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.concurrent.Semaphore;
+import org.apache.flink.annotation.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The locks are based on static {@link Semaphore} objects. We expect that the 
{@link
+ * TriggerManager} and the LockRemover operators will be placed on the same 
TaskManager (JVM), as
+ * they are both global. In this case JVM based locking should be enough to 
allow communication
+ * between the operators.
+ */
+@Internal
+public class JVMBasedLockFactory implements TriggerLockFactory {

Review Comment:
   nit: JVM seems redundant as this is Java code. maybe `SemaphoreLockFactory` 
or `InMemorySemaphoreLockFactory`?



##
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** . */
+@Internal
+class TriggerManager extends KeyedProcessFunction
+implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List taskNames;
+  private final List evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List triggerCounters;
+  private transient ValueState nextEvaluationTime;
+  private transient ListState accumulatedChanges;
+  private transient ListState lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock re

Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651099384


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path 
dst, int nextVersion) {
   if (fs.exists(dst)) {
 throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
   }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
-  }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  if (!nextVersionIsLatest(nextVersion, fs)) {

Review Comment:
   @Fokko 
   Also, this check I admit does not solve the problem in all scenarios at the 
moment, a better approach would be to call fs.rename successfully and then 
check that it's commits are normal  (This is the part of the code I want to 
submit a PR for later.). This check just intended to fail the client as soon as 
possible. Because there are a lot of clients running flink/kafka-stream to 
write to flink at the same time they are running the compaction task. 
Implementing fast failures is more friendly to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651106651


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -355,71 +475,90 @@ int findVersion() {
* an attempt will be made to delete the source file.
*
* @param fs the filesystem used for the rename
-   * @param src the source file
-   * @param dst the destination file
+   * @param tempMetaDataFile the source file
+   * @param finalMetaDataFile the destination file
*/
-  private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
+  @VisibleForTesting
+  boolean commitNewVersion(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer 
nextVersion)
+  throws IOException {
 try {
-  if (!lockManager.acquire(dst.toString(), src.toString())) {
+  if (!lockManager.acquire(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
 throw new CommitFailedException(
-"Failed to acquire lock on file: %s with owner: %s", dst, src);
+"Failed to acquire lock on file: %s with owner: %s",
+finalMetaDataFile, tempMetaDataFile);
   }
 
-  if (fs.exists(dst)) {
-throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
-  }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
+  if (fs.exists(finalMetaDataFile)) {
+throw new CommitFailedException(
+"Version %d already exists: %s", nextVersion, finalMetaDataFile);
   }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  // maybe too heavy.?
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+// In the case of concurrent execution,
+// verify that the version that is ready to be committed at a time is 
the latest version.
+throw new CommitFailedException("Version %d too old: %s", nextVersion, 
finalMetaDataFile);
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, tempMetaDataFile, 
finalMetaDataFile);
 } finally {
-  if (!lockManager.release(dst.toString(), src.toString())) {
-LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+  if (!lockManager.release(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
+LOG.warn(
+"Failed to release lock on file: {} with owner: {}",
+finalMetaDataFile,
+tempMetaDataFile);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
-try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
-}
+  private void cleanUncommittedMeta(Path src) {
+io().deleteFile(src.toString());
   }
 
   protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
 return Util.getFs(path, hadoopConf);
   }
 
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  // The most important step. There must be no mistakes in this step.
+  // Even if it does, we should stop everything.
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error("There were some problems with submitting the new version.", 
e);
+  try {
+if (newMetadataExists(fs, finalMetaDataFile) && 
!tempMetadataExists(fs, tempMetaDataFile)) {
+  return true;
+} else {
+  throw new CommitFailedException(e);
+}
+  } catch (CommitFailedException e1) {
+throw e1;
+  } catch (Throwable e2) {
+throw new CommitStateUnknownException(e2);
+  }
+}
+  }
+
+  @VisibleForTesting

Review Comment:
   @Fokko 
   Yes, but in practice many detailed scenarios are difficult to create 
locally. .. It is difficult to simulate such scenarios. For example: 
halfway through a method call, simulating all accesses to the filesystem 
suddenly throws an exception. I don't know how to simulate this scenario.
   
   For example, in your example above, we mock a FileSystem object, but there 
are multiple calls to the FileSystem object in the testRenameWithFileSystem 
method, and the behaviour of the entire method is different aft

Re: [PR] View Spec implementation [iceberg-rust]

2024-06-24 Thread via GitHub


c-thiel commented on code in PR #331:
URL: https://github.com/apache/iceberg-rust/pull/331#discussion_r1651128747


##
crates/iceberg/testdata/view_metadata/ViewMetadataV2Valid.json:
##
@@ -0,0 +1,58 @@
+{

Review Comment:
   Ok, thanks for the feedback @nastra. I'll add some more tests next week.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]

2024-06-24 Thread via GitHub


BsoBird commented on code in PR #9546:
URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651106651


##
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##
@@ -355,71 +475,90 @@ int findVersion() {
* an attempt will be made to delete the source file.
*
* @param fs the filesystem used for the rename
-   * @param src the source file
-   * @param dst the destination file
+   * @param tempMetaDataFile the source file
+   * @param finalMetaDataFile the destination file
*/
-  private void renameToFinal(FileSystem fs, Path src, Path dst, int 
nextVersion) {
+  @VisibleForTesting
+  boolean commitNewVersion(
+  FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer 
nextVersion)
+  throws IOException {
 try {
-  if (!lockManager.acquire(dst.toString(), src.toString())) {
+  if (!lockManager.acquire(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
 throw new CommitFailedException(
-"Failed to acquire lock on file: %s with owner: %s", dst, src);
+"Failed to acquire lock on file: %s with owner: %s",
+finalMetaDataFile, tempMetaDataFile);
   }
 
-  if (fs.exists(dst)) {
-throw new CommitFailedException("Version %d already exists: %s", 
nextVersion, dst);
-  }
-
-  if (!fs.rename(src, dst)) {
-CommitFailedException cfe =
-new CommitFailedException("Failed to commit changes using rename: 
%s", dst);
-RuntimeException re = tryDelete(src);
-if (re != null) {
-  cfe.addSuppressed(re);
-}
-throw cfe;
+  if (fs.exists(finalMetaDataFile)) {
+throw new CommitFailedException(
+"Version %d already exists: %s", nextVersion, finalMetaDataFile);
   }
-} catch (IOException e) {
-  CommitFailedException cfe =
-  new CommitFailedException(e, "Failed to commit changes using rename: 
%s", dst);
-  RuntimeException re = tryDelete(src);
-  if (re != null) {
-cfe.addSuppressed(re);
+  // maybe too heavy.?
+  if (!nextVersionIsLatest(nextVersion, fs)) {
+// In the case of concurrent execution,
+// verify that the version that is ready to be committed at a time is 
the latest version.
+throw new CommitFailedException("Version %d too old: %s", nextVersion, 
finalMetaDataFile);
   }
-  throw cfe;
+  return renameMetaDataFileAndCheck(fs, tempMetaDataFile, 
finalMetaDataFile);
 } finally {
-  if (!lockManager.release(dst.toString(), src.toString())) {
-LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+  if (!lockManager.release(finalMetaDataFile.toString(), 
finalMetaDataFile.toString())) {
+LOG.warn(
+"Failed to release lock on file: {} with owner: {}",
+finalMetaDataFile,
+tempMetaDataFile);
   }
 }
   }
 
-  /**
-   * Deletes the file from the file system. Any RuntimeException will be 
caught and returned.
-   *
-   * @param path the file to be deleted.
-   * @return RuntimeException caught, if any. null otherwise.
-   */
-  private RuntimeException tryDelete(Path path) {
-try {
-  io().deleteFile(path.toString());
-  return null;
-} catch (RuntimeException re) {
-  return re;
-}
+  private void cleanUncommittedMeta(Path src) {
+io().deleteFile(src.toString());
   }
 
   protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
 return Util.getFs(path, hadoopConf);
   }
 
+  @VisibleForTesting
+  boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, 
Path finalMetaDataFile) {
+try {
+  // The most important step. There must be no mistakes in this step.
+  // Even if it does, we should stop everything.
+  return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile);
+} catch (Throwable e) {
+  LOG.error("There were some problems with submitting the new version.", 
e);
+  try {
+if (newMetadataExists(fs, finalMetaDataFile) && 
!tempMetadataExists(fs, tempMetaDataFile)) {
+  return true;
+} else {
+  throw new CommitFailedException(e);
+}
+  } catch (CommitFailedException e1) {
+throw e1;
+  } catch (Throwable e2) {
+throw new CommitStateUnknownException(e2);
+  }
+}
+  }
+
+  @VisibleForTesting

Review Comment:
   @Fokko 
   Yes, but in practice many detailed scenarios are difficult to create 
locally. .. It is difficult to simulate such scenarios. For example: 
halfway through a method call, simulating all accesses to the filesystem 
suddenly throws an exception. I don't know how to simulate this scenario.
   
   For example, in your example above, we mock a FileSystem object, but there 
are multiple calls to the FileSystem object in the testRenameWithFileSystem 
method, and each call to the FileSystem object that throws an e