Re: [PR] build(deps): bump github.com/apache/arrow/go/v16 from 16.0.0 to 16.1.0 [iceberg-go]
Fokko merged PR #97: URL: https://github.com/apache/iceberg-go/pull/97 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] build(deps): bump github.com/hamba/avro/v2 from 2.21.1 to 2.22.1 [iceberg-go]
Fokko merged PR #94: URL: https://github.com/apache/iceberg-go/pull/94 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] build(deps): bump github.com/pterm/pterm from 0.12.78 to 0.12.79 [iceberg-go]
Fokko merged PR #93: URL: https://github.com/apache/iceberg-go/pull/93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add the build from source section [iceberg-go]
Fokko merged PR #70: URL: https://github.com/apache/iceberg-go/pull/70 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add the build from source section [iceberg-go]
Fokko commented on PR #70: URL: https://github.com/apache/iceberg-go/pull/70#issuecomment-2185823387 Thanks for the PR @git-hulk and @zeroshade for the review! π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Metadata Log Entries metadata table [iceberg-python]
HonahX commented on code in PR #667: URL: https://github.com/apache/iceberg-python/pull/667#discussion_r1650524201 ## pyiceberg/table/snapshots.py: ## @@ -226,7 +226,8 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) -sequence_number: Optional[int] = Field(alias="sequence-number", default=None) +# cannot import `INITIAL_SEQUENCE_NUMBER` due to circular import +sequence_number: Optional[int] = Field(alias="sequence-number", default=0) Review Comment: @kevinjqliu Thanks for spotting this! We definitely need to read snapshot.sequence_number as 0 for v1. However, as we have observed in the test outcome, making `sequence_number` default to 0 here leads to `sequence_number=0` be written to version 1 table metada's snapshots, which is not allowed by spec: ``` Writing v1 metadata: Snapshot field sequence-number should not be written ``` I think we may need a new [field_serializer](https://docs.pydantic.dev/latest/api/functional_serializers/#pydantic.functional_serializers.field_serializer) in `TableMetadataCommonFields` class or some other ways to correct the behavior on write. WDYT? ## pyiceberg/table/__init__.py: ## @@ -3827,6 +3827,40 @@ def _partition_summaries_to_rows( schema=manifest_schema, ) +def metadata_log_entries(self) -> "pa.Table": +import pyarrow as pa + +from pyiceberg.table.snapshots import MetadataLogEntry + +table_schema = pa.schema([ +pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False), +pa.field("file", pa.string(), nullable=False), +pa.field("latest_snapshot_id", pa.int64(), nullable=True), +pa.field("latest_schema_id", pa.int32(), nullable=True), +pa.field("latest_sequence_number", pa.int64(), nullable=True), +]) + +def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]: +latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms) +return { +"timestamp": metadata_entry.timestamp_ms, +"file": metadata_entry.metadata_file, +"latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None, +"latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None, +"latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, +} + +# imitates `addPreviousFile` from Java +# https://github.com/apache/iceberg/blob/8248663a2a1ffddd2664ea37b45882455466f71c/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1450-L1451 +metadata_log_entries = self.tbl.metadata.metadata_log + [ +MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) Review Comment: It seems this line acts more like https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66. Just curious the reason that you mention `addPreviousFile` here, which seem to be more relevant when we update the metadata_log during table commit. BTW, this reminds me that currently non-rest catalog does not update the metadata_log field during commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Encryption integration and test [iceberg]
hsiang-c commented on code in PR #5544: URL: https://github.com/apache/iceberg/pull/5544#discussion_r1650529269 ## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java: ## @@ -137,7 +163,47 @@ protected String tableName() { @Override public FileIO io() { -return fileIO; +if (encryptionManager == null) { + encryptionManager = encryption(); +} + +if (!encryptedTable) { + return fileIO; +} + +if (encryptingFileIO != null) { + return encryptingFileIO; +} + +encryptingFileIO = EncryptingFileIO.combine(fileIO, encryptionManager); +return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { +if (encryptionManager != null) { + return encryptionManager; +} + +if (encryptionKeyId == null) { + encryptionPropsFromHms(); Review Comment: This LGTM now. The previous approach (i.e. call the `dekLength()` in `BaseMetastoreTableOperations`) will only set `encryptionDekLength` on the write path (the `writeNewMetadataFile` method). Therefore, Hive Catalog readers get the default -1 value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] feat(exprs): Adding BooleanExpressions and Predicates [iceberg-go]
Fokko commented on code in PR #91: URL: https://github.com/apache/iceberg-go/pull/91#discussion_r1650517065 ## exprs.go: ## @@ -0,0 +1,968 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "fmt" + + "github.com/google/uuid" +) + +//go:generate stringer -type=Operation -linecomment + +// Operation is an enum used for constants to define what operation a given +// expression or predicate is going to execute. +type Operation int + +const ( + // do not change the order of these enum constants. + // they are grouped for quick validation of operation type by + // using <= and >= of the first/last operation in a group + + OpTrue Operation = iota // True + OpFalse // False + // unary ops + OpIsNull // IsNull + OpNotNull // NotNull + OpIsNan // IsNaN + OpNotNan // NotNaN + // literal ops + OpLT// LessThan + OpLTEQ // LessThanEqual + OpGT// GreaterThan + OpGTEQ // GreaterThanEqual + OpEQ// Equal + OpNEQ // NotEqual + OpStartsWith// StartsWith + OpNotStartsWith // NotStartsWith + // set ops + OpIn// In + OpNotIn // NotIn + // boolean ops + OpNot // Not + OpAnd // And + OpOr // Or +) + +// Negate returns the inverse operation for a given op +func (op Operation) Negate() Operation { + switch op { + case OpIsNull: + return OpNotNull + case OpNotNull: + return OpIsNull + case OpIsNan: + return OpNotNan + case OpNotNan: + return OpIsNan + case OpLT: + return OpGTEQ + case OpLTEQ: + return OpGT + case OpGT: + return OpLTEQ + case OpGTEQ: + return OpLT + case OpEQ: + return OpNEQ + case OpNEQ: + return OpEQ + case OpIn: + return OpNotIn + case OpNotIn: + return OpIn + case OpStartsWith: + return OpNotStartsWith + case OpNotStartsWith: + return OpStartsWith + default: + panic("no negation for operation " + op.String()) + } +} + +// FlipLR returns the correct operation to use if the left and right operands +// are flipped. +func (op Operation) FlipLR() Operation { + switch op { + case OpLT: + return OpGT + case OpLTEQ: + return OpGTEQ + case OpGT: + return OpLT + case OpGTEQ: + return OpLTEQ + case OpAnd: + return OpAnd + case OpOr: + return OpOr + default: + panic("no left-right flip for operation: " + op.String()) + } +} + +// BooleanExpression represents a full expression which will evaluate to a +// boolean value such as GreaterThan or StartsWith, etc. +type BooleanExpression interface { + fmt.Stringer + Op() Operation + Negate() BooleanExpression + Equals(BooleanExpression) bool +} + +// AlwaysTrue is the boolean expression "True" +type AlwaysTrue struct{} + +func (AlwaysTrue) String() string{ return "AlwaysTrue()" } +func (AlwaysTrue) Op() Operation { return OpTrue } +func (AlwaysTrue) Negate() BooleanExpression { return AlwaysFalse{} } +func (AlwaysTrue) Equals(other BooleanExpression) bool { + _, ok := other.(AlwaysTrue) + return ok +} + +// AlwaysFalse is the boolean expression "False" +type AlwaysFalse struct{} + +func (AlwaysFalse) String() string{ return "AlwaysFalse()" } +func (AlwaysFalse) Op() Operation { return OpFalse } +func (AlwaysFalse) Negate() BooleanExpression { return AlwaysTrue{} } +func (AlwaysFalse) Equals(other BooleanExpression) bool { + _, ok := other.(AlwaysFalse) + return ok +} + +type NotExpr struct { + child BooleanExpression +} + +// NewNot creates a BooleanExpression representing a "Not" operation on the given +// argument. It will optimize slightly though: +// +// If t
Re: [PR] Build: Bump mkdocs-material from 9.5.26 to 9.5.27 [iceberg]
Fokko merged PR #10555: URL: https://github.com/apache/iceberg/pull/10555 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump software.amazon.awssdk:bom from 2.26.3 to 2.26.7 [iceberg]
Fokko merged PR #10554: URL: https://github.com/apache/iceberg/pull/10554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump software.amazon.awssdk:bom from 2.26.3 to 2.26.7 [iceberg]
Fokko commented on PR #10554: URL: https://github.com/apache/iceberg/pull/10554#issuecomment-2185905602 Thanks for the review @singhpk234 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump nessie from 0.90.4 to 0.91.1 [iceberg]
Fokko merged PR #10551: URL: https://github.com/apache/iceberg/pull/10551 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump nessie from 0.90.4 to 0.91.1 [iceberg]
Fokko commented on PR #10551: URL: https://github.com/apache/iceberg/pull/10551#issuecomment-2185914294 Thanks for reviewing @ajantha-bhat π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump org.roaringbitmap:RoaringBitmap from 1.0.6 to 1.1.0 [iceberg]
Fokko merged PR #10552: URL: https://github.com/apache/iceberg/pull/10552 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynFields, DynMethods code cleanup [iceberg]
Fokko commented on code in PR #10543: URL: https://github.com/apache/iceberg/pull/10543#discussion_r1650614392 ## common/src/main/java/org/apache/iceberg/common/DynMethods.java: ## @@ -313,6 +317,8 @@ public Builder impl(Class targetClass, Class... argClasses) { return this; } +/** @deprecated since 1.6.0, will be removed in 1.7.0 */ +@Deprecated Review Comment: I think we can deprecate these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynFields, DynMethods code cleanup [iceberg]
Fokko commented on code in PR #10543: URL: https://github.com/apache/iceberg/pull/10543#discussion_r1650614916 ## common/src/main/java/org/apache/iceberg/common/DynMethods.java: ## @@ -161,6 +161,8 @@ private BoundMethod(UnboundMethod method, Object receiver) { this.receiver = receiver; } +/** @deprecated since 1.6.0, will be removed in 1.7.0 */ Review Comment: I don't mind having a few additional methods around. They come in a pair of `{invoke, invokeChecked}`. It looks weird if they are sometime defined, and sometimes missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Build: Bump parquet from 1.13.1 to 1.14.1 [iceberg]
Fokko commented on PR #10553: URL: https://github.com/apache/iceberg/pull/10553#issuecomment-2185975514 Blocked by the Gradle update π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[I] support bloom-filter writing [iceberg-python]
raphaelauv opened a new issue, #850: URL: https://github.com/apache/iceberg-python/issues/850 ### Feature Request / Improvement Hi, py-iceberg wannot yet write bloom-filter, would be a great feature, thanks all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] [Bug]: Can't create/update tables in REST nessie catalog via Spark. Iceberg Cannot read field "formatVersion" because "x0" is null [iceberg]
Fokko commented on issue #10533: URL: https://github.com/apache/iceberg/issues/10533#issuecomment-2186001634 @ajantha-bhat do you have time to dig into this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
Fokko commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650701857 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata metadata) { int nextVersion = (current.first() != null ? current.first() : 0) + 1; Path finalMetadataFile = metadataFilePath(nextVersion, codec); FileSystem fs = getFileSystem(tempMetadataFile, conf); - -// this rename operation is the atomic commit operation -renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion); - -LOG.info("Committed a new metadata file {}", finalMetadataFile); - -// update the best-effort version pointer -writeVersionHint(nextVersion); - -deleteRemovedMetadataFiles(base, metadata); - -this.shouldRefresh = true; +boolean versionCommitSuccess = false; +try { + fs.delete(versionHintFile(), false /* recursive delete*/); Review Comment: > Sir, the purpose of versionHintFile is simply to speed up the reading of the latest version. itβs just a index file. That is correct, but it is also highly coupled to the Iceberg project since Iceberg optimizes for object stores. Object stores are terrible at listing files since they are often paged responses. These are both slow and costly, therefore the pointer in the `versionHintFile` makes this much faster. > In a file system that is basically posix compliant, we theoretically don't need lockManager to prevent multiple clients from committing concurrently, because fs.rename can do that. How would that work? I think a race condition cannot be avoided, where it would pass 99.99% of the time, but it can be that they overwrite each other because they don't know that multiple processes are writing to the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Api: Track partition statistics via TableMetadata [iceberg]
advancedxy commented on code in PR #8502: URL: https://github.com/apache/iceberg/pull/8502#discussion_r1650705615 ## core/src/main/java/org/apache/iceberg/TableMetadataParser.java: ## @@ -481,6 +488,13 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { statisticsFiles = ImmutableList.of(); } +List partitionStatisticsFiles; +if (node.has(PARTITION_STATISTICS)) { + partitionStatisticsFiles = partitionStatsFilesFromJson(node.get(PARTITION_STATISTICS)); +} else { + partitionStatisticsFiles = ImmutableList.of(); +} + Review Comment: Hi @ajantha-bhat and @aokolnychyi, I have a question about this implementation as I'm exploring to add new fields into TableMetadata. Suppose the table `db.table`'s partition stats is updated by the new version of Iceberg via UpdatePartitionStatistics. After that, some old version of Iceberg library or the PyIceberg client produces a new commit to this table. Per my understanding, that writer will produce TableMetadata without `PARTITION_STATISTICS` since it knows nothing about `PARTITION_STATISTICS`, which effectively loses that info for the table. Do you have any solutions or ideas on how to prevent such cases? I can think of some potential ideas, such as: 1. upgrade the format_version to a new one whenever we need to add new fields to table metadata, all the old clients will be rejected by the version check then. 2. define a writer_version field, old client can read metadata produced by new client, but it will reject writers with old versions. 3. move the check to the REST catalog service? I feel it's too heavy to do a format upgrade when only adding new fields in TableMetadata. Do you have any other ideas? Really appreciate your inputs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Iceberg Spark streaming skips rows of data [iceberg]
cccs-jc commented on issue #10156: URL: https://github.com/apache/iceberg/issues/10156#issuecomment-2186073673 @singhpk234 any progress on this issue ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
Fokko commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650711386 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { Review Comment: If the locking is implemented correctly, then this should not happen if I follow the logical correctly. This operation will list the prefix, which can be costly: - Doing additional operations to the filesystem/object-store - Keeping the lock for a longer time than needed, slowing down the commit process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
Fokko commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650714966 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { +throw new CommitFailedException( +"Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", +nextVersion, findVersionWithOutVersionHint(fs)); } - throw cfe; + return renameMetaDataFileAndCheck(fs, src, dst); } finally { if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { +return Util.getFs(path, hadoopConf); + } + + @VisibleForTesting + boolean checkMetaDataFileRenameSuccess( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws IOException { +return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile); + } + + @VisibleForTesting + boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) + throws IOException { +return fs.rename(tempMetaDataFile, finalMetaDataFile); + } + + private boolean renameCheck( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable rootError) { try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; + return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error( + "No correctness check can be performed.src=[{}],dst=[{}].", + tempMetaDataFile, + finalMetaDataFile, + e); + String msg = + String.format( + "Exception thrown when renaming [%s] to [%s].Also no correctness check can be performed.", + tempMetaDataFile, finalMetaDataFile); + throw new CommitStateUnknownException(msg, rootError != null ? rootError : e); } } - protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { -return Util.getFs(path, hadoopConf); + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { Review Comment: I think it is best to be more specific here: ```suggestion } catch (IOException e) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Issue with CALL parsing [iceberg]
qianzhen0 commented on issue #8343: URL: https://github.com/apache/iceberg/issues/8343#issuecomment-2186085314 in my case, adding `spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions` solve the problem -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
Fokko commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650722647 ## core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java: ## @@ -206,6 +210,139 @@ public void testFailedCommit() throws Exception { Assertions.assertThat(manifests).as("Should contain 0 Avro manifest files").isEmpty(); } + @Test + public void testCommitFailedBeforeChangeVersionHint() throws IOException { +table.newFastAppend().appendFile(FILE_A).commit(); +BaseTable baseTable = (BaseTable) table; +HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); + +HadoopTableOperations spyOps2 = spy(tableOperations); +doReturn(1).when(spyOps2).findVersionWithOutVersionHint(any()); +TableMetadata metadataV1 = spyOps2.current(); +SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); +TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort); +assertThatThrownBy(() -> spyOps2.commit(metadataV1, metadataV2)) +.isInstanceOf(CommitFailedException.class) +.hasMessageContaining("Are there other clients running in parallel with the current task?"); + +HadoopTableOperations spyOps3 = spy(tableOperations); +doReturn(false).when(spyOps3).nextVersionIsLatest(any(), any()); +assertCommitNotChangeVersion( +baseTable, +spyOps3, +CommitFailedException.class, +"Are there other clients running in parallel with the current task?"); + +HadoopTableOperations spyOps4 = spy(tableOperations); +doThrow(new RuntimeException("FileSystem crash!")) +.when(spyOps4) +.renameMetaDataFileAndCheck(any(), any(), any()); +assertCommitNotChangeVersion( +baseTable, spyOps4, CommitFailedException.class, "FileSystem crash!"); + } + + @Test + public void testCommitFailedAndCheckFailed() throws IOException { +table.newFastAppend().appendFile(FILE_A).commit(); +BaseTable baseTable = (BaseTable) table; +HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); +HadoopTableOperations spyOps = spy(tableOperations); +doThrow(new RuntimeException("FileSystem crash!")) Review Comment: I think `IOException`'s are more appropriate here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650728447 ## core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java: ## @@ -206,6 +210,139 @@ public void testFailedCommit() throws Exception { Assertions.assertThat(manifests).as("Should contain 0 Avro manifest files").isEmpty(); } + @Test + public void testCommitFailedBeforeChangeVersionHint() throws IOException { +table.newFastAppend().appendFile(FILE_A).commit(); +BaseTable baseTable = (BaseTable) table; +HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); + +HadoopTableOperations spyOps2 = spy(tableOperations); +doReturn(1).when(spyOps2).findVersionWithOutVersionHint(any()); +TableMetadata metadataV1 = spyOps2.current(); +SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); +TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort); +assertThatThrownBy(() -> spyOps2.commit(metadataV1, metadataV2)) +.isInstanceOf(CommitFailedException.class) +.hasMessageContaining("Are there other clients running in parallel with the current task?"); + +HadoopTableOperations spyOps3 = spy(tableOperations); +doReturn(false).when(spyOps3).nextVersionIsLatest(any(), any()); +assertCommitNotChangeVersion( +baseTable, +spyOps3, +CommitFailedException.class, +"Are there other clients running in parallel with the current task?"); + +HadoopTableOperations spyOps4 = spy(tableOperations); +doThrow(new RuntimeException("FileSystem crash!")) +.when(spyOps4) +.renameMetaDataFileAndCheck(any(), any(), any()); +assertCommitNotChangeVersion( +baseTable, spyOps4, CommitFailedException.class, "FileSystem crash!"); + } + + @Test + public void testCommitFailedAndCheckFailed() throws IOException { +table.newFastAppend().appendFile(FILE_A).commit(); +BaseTable baseTable = (BaseTable) table; +HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); +HadoopTableOperations spyOps = spy(tableOperations); +doThrow(new RuntimeException("FileSystem crash!")) Review Comment: @Fokko Okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
Fokko commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650739534 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -355,71 +475,90 @@ int findVersion() { * an attempt will be made to delete the source file. * * @param fs the filesystem used for the rename - * @param src the source file - * @param dst the destination file + * @param tempMetaDataFile the source file + * @param finalMetaDataFile the destination file */ - private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { + @VisibleForTesting + boolean commitNewVersion( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer nextVersion) + throws IOException { try { - if (!lockManager.acquire(dst.toString(), src.toString())) { + if (!lockManager.acquire(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { throw new CommitFailedException( -"Failed to acquire lock on file: %s with owner: %s", dst, src); +"Failed to acquire lock on file: %s with owner: %s", +finalMetaDataFile, tempMetaDataFile); } - if (fs.exists(dst)) { -throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); - } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; + if (fs.exists(finalMetaDataFile)) { +throw new CommitFailedException( +"Version %d already exists: %s", nextVersion, finalMetaDataFile); } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + // maybe too heavy.? + if (!nextVersionIsLatest(nextVersion, fs)) { +// In the case of concurrent execution, +// verify that the version that is ready to be committed at a time is the latest version. +throw new CommitFailedException("Version %d too old: %s", nextVersion, finalMetaDataFile); } - throw cfe; + return renameMetaDataFileAndCheck(fs, tempMetaDataFile, finalMetaDataFile); } finally { - if (!lockManager.release(dst.toString(), src.toString())) { -LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + if (!lockManager.release(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { +LOG.warn( +"Failed to release lock on file: {} with owner: {}", +finalMetaDataFile, +tempMetaDataFile); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { -try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; -} + private void cleanUncommittedMeta(Path src) { +io().deleteFile(src.toString()); } protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { return Util.getFs(path, hadoopConf); } + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + // The most important step. There must be no mistakes in this step. + // Even if it does, we should stop everything. + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error("There were some problems with submitting the new version.", e); + try { +if (newMetadataExists(fs, finalMetaDataFile) && !tempMetadataExists(fs, tempMetaDataFile)) { + return true; +} else { + throw new CommitFailedException(e); +} + } catch (CommitFailedException e1) { +throw e1; + } catch (Throwable e2) { +throw new CommitStateUnknownException(e2); + } +} + } + + @VisibleForTesting Review Comment: With OOP programming you don't want to assert the calls, but want to validate the behavior of the HadoopTableOperations. In order to do this you would create scenario's where you use Mockito to let certain operations fail (for example `fs.rename`), and then you assert the desired state. Opening up the object for testing is in general considered a bad practice. An example can be found here: https://github.com/apache/iceberg/blob/29fd2a0cb940fd16aa5bfb8003ec1354e678c949/core/src/test/java/org/apache/iceberg/hadoo
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { +throw new CommitFailedException( +"Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", +nextVersion, findVersionWithOutVersionHint(fs)); } - throw cfe; + return renameMetaDataFileAndCheck(fs, src, dst); } finally { if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { +return Util.getFs(path, hadoopConf); + } + + @VisibleForTesting + boolean checkMetaDataFileRenameSuccess( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws IOException { +return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile); + } + + @VisibleForTesting + boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) + throws IOException { +return fs.rename(tempMetaDataFile, finalMetaDataFile); + } + + private boolean renameCheck( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable rootError) { try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; + return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error( + "No correctness check can be performed.src=[{}],dst=[{}].", + tempMetaDataFile, + finalMetaDataFile, + e); + String msg = + String.format( + "Exception thrown when renaming [%s] to [%s].Also no correctness check can be performed.", + tempMetaDataFile, finalMetaDataFile); + throw new CommitStateUnknownException(msg, rootError != null ? rootError : e); } } - protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { -return Util.getFs(path, hadoopConf); + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { Review Comment: > I think it is best to be more specific here: @Fokko This is a crucial step. I think we should enlarge the scope of the exception detection. This is because at this point we may encounter OOM exceptions and so on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { +throw new CommitFailedException( +"Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", +nextVersion, findVersionWithOutVersionHint(fs)); } - throw cfe; + return renameMetaDataFileAndCheck(fs, src, dst); } finally { if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { +return Util.getFs(path, hadoopConf); + } + + @VisibleForTesting + boolean checkMetaDataFileRenameSuccess( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws IOException { +return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile); + } + + @VisibleForTesting + boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) + throws IOException { +return fs.rename(tempMetaDataFile, finalMetaDataFile); + } + + private boolean renameCheck( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable rootError) { try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; + return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error( + "No correctness check can be performed.src=[{}],dst=[{}].", + tempMetaDataFile, + finalMetaDataFile, + e); + String msg = + String.format( + "Exception thrown when renaming [%s] to [%s].Also no correctness check can be performed.", + tempMetaDataFile, finalMetaDataFile); + throw new CommitStateUnknownException(msg, rootError != null ? rootError : e); } } - protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { -return Util.getFs(path, hadoopConf); + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { Review Comment: > I think it is best to be more specific here: @Fokko This is a crucial step. I think we should enlarge the scope of the exception detection. This is because at this point we may encounter OOM exceptions and so on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { +throw new CommitFailedException( +"Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", +nextVersion, findVersionWithOutVersionHint(fs)); } - throw cfe; + return renameMetaDataFileAndCheck(fs, src, dst); } finally { if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { +return Util.getFs(path, hadoopConf); + } + + @VisibleForTesting + boolean checkMetaDataFileRenameSuccess( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws IOException { +return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile); + } + + @VisibleForTesting + boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) + throws IOException { +return fs.rename(tempMetaDataFile, finalMetaDataFile); + } + + private boolean renameCheck( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable rootError) { try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; + return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error( + "No correctness check can be performed.src=[{}],dst=[{}].", + tempMetaDataFile, + finalMetaDataFile, + e); + String msg = + String.format( + "Exception thrown when renaming [%s] to [%s].Also no correctness check can be performed.", + tempMetaDataFile, finalMetaDataFile); + throw new CommitStateUnknownException(msg, rootError != null ? rootError : e); } } - protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { -return Util.getFs(path, hadoopConf); + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { Review Comment: > I think it is best to be more specific here: @Fokko This is a crucial step. I think we should enlarge the scope of the exception detection. This is because at this point we may encounter OOM exceptions and so on. Detecting only IO exceptions will miss boundary cases such as runtime exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650741211 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { +throw new CommitFailedException( +"Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", +nextVersion, findVersionWithOutVersionHint(fs)); } - throw cfe; + return renameMetaDataFileAndCheck(fs, src, dst); } finally { if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { +return Util.getFs(path, hadoopConf); + } + + @VisibleForTesting + boolean checkMetaDataFileRenameSuccess( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws IOException { +return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile); + } + + @VisibleForTesting + boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) + throws IOException { +return fs.rename(tempMetaDataFile, finalMetaDataFile); + } + + private boolean renameCheck( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable rootError) { try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; + return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error( + "No correctness check can be performed.src=[{}],dst=[{}].", + tempMetaDataFile, + finalMetaDataFile, + e); + String msg = + String.format( + "Exception thrown when renaming [%s] to [%s].Also no correctness check can be performed.", + tempMetaDataFile, finalMetaDataFile); + throw new CommitStateUnknownException(msg, rootError != null ? rootError : e); } } - protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { -return Util.getFs(path, hadoopConf); + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { Review Comment: > I think it is best to be more specific here: @Fokko This is a crucial step. I think we should enlarge the scope of the exception detection. This is because at this point we may encounter OOM exceptions and so on. Detecting only IO exceptions will miss boundary cases such as runtime exceptions. Our intention is that whatever we encounter in this step, we can throw an CommitStateUnknownException to ensure that we don't trigger other actions that affect the correctness of the data, such as file cleanup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] [Bug]: Can't create/update tables in REST nessie catalog via Spark. Iceberg Cannot read field "formatVersion" because "x0" is null [iceberg]
ajantha-bhat commented on issue #10533: URL: https://github.com/apache/iceberg/issues/10533#issuecomment-2186193549 yeah. We will check it. cc: @dimas-b, @snazy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650769239 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata metadata) { int nextVersion = (current.first() != null ? current.first() : 0) + 1; Path finalMetadataFile = metadataFilePath(nextVersion, codec); FileSystem fs = getFileSystem(tempMetadataFile, conf); - -// this rename operation is the atomic commit operation -renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion); - -LOG.info("Committed a new metadata file {}", finalMetadataFile); - -// update the best-effort version pointer -writeVersionHint(nextVersion); - -deleteRemovedMetadataFiles(base, metadata); - -this.shouldRefresh = true; +boolean versionCommitSuccess = false; +try { + fs.delete(versionHintFile(), false /* recursive delete*/); Review Comment: @Fokko >How would that work? I think a race condition cannot be avoided, where it would pass 99.99% of the time, but it can be that they overwrite each other because they don't know that multiple processes are writing to the table. For filesystems that support atomic operations, rename operations do not overwrite each other. On the other hand, this operation is not supported in object storage. Due to the atomic nature of the operation, each client will know if its operation succeeded or not when it renames. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650772614 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata metadata) { int nextVersion = (current.first() != null ? current.first() : 0) + 1; Path finalMetadataFile = metadataFilePath(nextVersion, codec); FileSystem fs = getFileSystem(tempMetadataFile, conf); - -// this rename operation is the atomic commit operation -renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion); - -LOG.info("Committed a new metadata file {}", finalMetadataFile); - -// update the best-effort version pointer -writeVersionHint(nextVersion); - -deleteRemovedMetadataFiles(base, metadata); - -this.shouldRefresh = true; +boolean versionCommitSuccess = false; +try { + fs.delete(versionHintFile(), false /* recursive delete*/); Review Comment: > That is correct, but it is also highly coupled to the Iceberg project since Iceberg optimizes for object stores. Object stores are terrible at listing files since they are often paged responses. These are both slow and costly, therefore the pointer in the `versionHintFile` makes this much faster. @Fokko For object storage, since the current implementation of fileSystemCatalog is based on the assumption that the file system supports atomic operations, the current implementation can not be used for object storage, either we provide another implementation, or we use some CNCF's middleware to proxy, which usually can provide atomic operation features. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1650772614 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -159,18 +160,30 @@ public void commit(TableMetadata base, TableMetadata metadata) { int nextVersion = (current.first() != null ? current.first() : 0) + 1; Path finalMetadataFile = metadataFilePath(nextVersion, codec); FileSystem fs = getFileSystem(tempMetadataFile, conf); - -// this rename operation is the atomic commit operation -renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion); - -LOG.info("Committed a new metadata file {}", finalMetadataFile); - -// update the best-effort version pointer -writeVersionHint(nextVersion); - -deleteRemovedMetadataFiles(base, metadata); - -this.shouldRefresh = true; +boolean versionCommitSuccess = false; +try { + fs.delete(versionHintFile(), false /* recursive delete*/); Review Comment: > That is correct, but it is also highly coupled to the Iceberg project since Iceberg optimizes for object stores. Object stores are terrible at listing files since they are often paged responses. These are both slow and costly, therefore the pointer in the `versionHintFile` makes this much faster. @Fokko For object storage, since the current implementation of fileSystemCatalog is based on the assumption that the file system supports atomic operations, the current implementation can not be used for object storage, either we provide another implementation, or we use some CNCF's middleware to proxy, which usually can provide atomic operation features. Example: cubefs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix duplicate data in Flink's upsert writer for format V2 [iceberg]
pvary commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650849668 ## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ## @@ -426,30 +425,44 @@ private void commitOperation( } @Override - public void processElement(StreamRecord element) { -this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord element) { +FlinkWriteResult flinkWriteResult = element.getValue(); +List writeResults = +writeResultsSinceLastSnapshot.computeIfAbsent( +flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); +writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. -long currentCheckpointId = Long.MAX_VALUE; -dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); -writeResultsOfCurrentCkpt.clear(); - +long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; +writeToManifestSinceLastSnapshot(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException { +if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { + dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA); +} + +for (Map.Entry> writeResultsOfCkpt : +writeResultsSinceLastSnapshot.entrySet()) { + dataFilesPerCheckpoint.put( Review Comment: Is this a scenario which could happen? - Writer 1 writes data - Writer 2 writes data - Writer 1 prepareSnapshotPreBarrier for Checkpoint 1, File 1-1 with checkpoint 1 created - Committer receives File 1-1 - Writer 2 prepareSnapshotPreBarrier for Checkpoint 1, File 1-2 with checkpoint 1 created - Committer receives File 1-2 - Writer 1 writes data - Writer 2 writes data - Writer 1 prepareSnapshotPreBarrier for Checkpoint 2, File 2-1 with checkpoint 2 created - Committer receives File 2-1 - Committer starts snapshotState for Checkpoint 1 - At this stage we have `writeResultsSinceLastSnapshot` with more data, but not all of them are ready to commit. - Committer receives File 2-1 - Committer starts snapshotState for Checkpoint 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix duplicate data in Flink's upsert writer for format V2 [iceberg]
pvary commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650851730 ## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ## @@ -426,30 +425,44 @@ private void commitOperation( } @Override - public void processElement(StreamRecord element) { -this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord element) { +FlinkWriteResult flinkWriteResult = element.getValue(); +List writeResults = +writeResultsSinceLastSnapshot.computeIfAbsent( +flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); +writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. -long currentCheckpointId = Long.MAX_VALUE; -dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); -writeResultsOfCurrentCkpt.clear(); - +long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; +writeToManifestSinceLastSnapshot(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException { +if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { Review Comment: What if we don't have data for the last checkpoint, but we have data for the previous checkpoints? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[PR] Refactored data clumps with the help of LLMs (research project) [iceberg]
compf opened a new pull request, #10558: URL: https://github.com/apache/iceberg/pull/10558 Hello maintainers, I am conducting a master thesis project focused on enhancing code quality through automated refactoring of data clumps, assisted by Large Language Models (LLMs). Data clump definition A data clump exists if 1. two methods (in the same or in different classes) have at least 3 common parameters and one of those methods does not override the other, or 2. At least three fields in a class are common with the parameters of a method (in the same or in a different class), or 3. Two different classes have at least three common fields See also the following UML diagram as an example  I believe these refactoring can contribute to the project by reducing complexity and enhancing readability of your source code. Pursuant to the EU AI Act, I fully disclose the use of LLMs in generating these refactorings, emphasizing that all changes have undergone human review for quality assurance. Even if you decide not to integrate my changes to your codebase (which is perfectly fine), I ask you to fill out a feedback survey, which will be scientifically evaluated to determine the acceptance of AI-supported refactorings. You can find the feedback survey under https://campus.lamapoll.de/Data-clump-refactoring/en Thank you for considering my contribution. I look forward to your feedback. If you have any other questions or comments, feel free to write a comment, or email me under tschoema...@uni-osnabrueck.de . Best regards, Timo Schoemaker Department of Computer Science University of OsnabrΓΌck -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] refactor: Remove dependency of tokio. [iceberg-rust]
Xuanwo commented on issue #418: URL: https://github.com/apache/iceberg-rust/issues/418#issuecomment-2186329846 Let's me do this. I feel like it's a good win for this to remove direct dep on tokio. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix duplicate data in Flink's upsert writer for format V2 [iceberg]
zhongqishang commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650864804 ## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ## @@ -426,30 +425,44 @@ private void commitOperation( } @Override - public void processElement(StreamRecord element) { -this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord element) { +FlinkWriteResult flinkWriteResult = element.getValue(); +List writeResults = +writeResultsSinceLastSnapshot.computeIfAbsent( +flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); +writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. -long currentCheckpointId = Long.MAX_VALUE; -dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); -writeResultsOfCurrentCkpt.clear(); - +long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; +writeToManifestSinceLastSnapshot(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException { +if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { Review Comment: The last checkpoint will be set to `EMPTY_MANIFEST_DATA`, and the next logic will be executed, looping `writeResultsSinceLastSnapshot` to generate manifest data corresponding to the checkpointid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynFields, DynMethods code cleanup [iceberg]
findepi commented on code in PR #10543: URL: https://github.com/apache/iceberg/pull/10543#discussion_r1650894264 ## common/src/main/java/org/apache/iceberg/common/DynMethods.java: ## @@ -161,6 +161,8 @@ private BoundMethod(UnboundMethod method, Object receiver) { this.receiver = receiver; } +/** @deprecated since 1.6.0, will be removed in 1.7.0 */ Review Comment: My thinking is that any variant needed in the future is very easy to add when needed and doesn't necessitate keeping current-dead-code around. Removing dead code clears the picture and open eyes on further simplifications. Does this make sense? BTW recently, i was starring at a pretty complex class that was negatively impacted by some other changes only to eventually realize it's used only in its own tests and can simply be deleted. What a relief. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Improve MetricsReporter loading with class loader fallback [iceberg]
findepi commented on code in PR #10459: URL: https://github.com/apache/iceberg/pull/10459#discussion_r1650899387 ## core/src/test/resources/com/example/target/custom-metrics-reporter-1.0-SNAPSHOT.jar: ## Review Comment: You can create a new classloader with e.g. `new URLClassLoader` and this doesn't require adding a binary jar to the repo. You can follow the example of https://github.com/trinodb/trino/blob/c0276cea688da155f7f5bf0cb0b53169ece43e6e/core/trino-main/src/main/java/io/trino/sql/gen/IsolatedClass.java#L32 to see e.g. how you can populate such a classloader with a class that you compiled normally and is part of your classpath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Race condition in CachingCatalog [iceberg]
findepi commented on issue #10493: URL: https://github.com/apache/iceberg/issues/10493#issuecomment-2186420099 @singhpk234 indeed this would alleviate the concern, but I don't see a way to implement caching in `Tabble loadTable(TableIdentifier)` method. We don't have the snapshot ID yet, and once we have it (delegate load), there is no way to use the cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Add configurable cache expiry to caching catalog [iceberg]
findepi commented on code in PR #3543: URL: https://github.com/apache/iceberg/pull/3543#discussion_r1650929394 ## core/src/main/java/org/apache/iceberg/CachingCatalog.java: ## @@ -29,24 +34,91 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps an Iceberg Catalog to cache tables. + * + * See {@link CatalogProperties#CACHE_EXPIRATION_INTERVAL_MS} for more details + * regarding special values for {@code expirationIntervalMillis}. + */ public class CachingCatalog implements Catalog { + + private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class); + private static final RemovalListener identLoggingRemovalListener = + (key, value, cause) -> LOG.debug("Evicted {} from the table cache ({})", key, cause); + public static Catalog wrap(Catalog catalog) { -return wrap(catalog, true); +return wrap(catalog, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF); + } + + public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) { +return wrap(catalog, true, expirationIntervalMillis); } - public static Catalog wrap(Catalog catalog, boolean caseSensitive) { -return new CachingCatalog(catalog, caseSensitive); + public static Catalog wrap(Catalog catalog, boolean caseSensitive, long expirationIntervalMillis) { +return new CachingCatalog(catalog, caseSensitive, expirationIntervalMillis); } - private final Cache tableCache = Caffeine.newBuilder().softValues().build(); private final Catalog catalog; private final boolean caseSensitive; + @SuppressWarnings("checkstyle:VisibilityModifier") + protected final long expirationIntervalMillis; + @SuppressWarnings("checkstyle:VisibilityModifier") + protected final Cache tableCache; + + private CachingCatalog(Catalog catalog, boolean caseSensitive, long expirationIntervalMillis) { +this(catalog, caseSensitive, expirationIntervalMillis, Ticker.systemTicker()); + } - private CachingCatalog(Catalog catalog, boolean caseSensitive) { + @SuppressWarnings("checkstyle:VisibilityModifier") + protected CachingCatalog(Catalog catalog, boolean caseSensitive, long expirationIntervalMillis, Ticker ticker) { +Preconditions.checkArgument(expirationIntervalMillis != 0, +"When %s is set to 0, the catalog cache should be disabled. This indicates a bug.", +CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS); this.catalog = catalog; this.caseSensitive = caseSensitive; +this.expirationIntervalMillis = expirationIntervalMillis; +this.tableCache = createTableCache(ticker); + } + + /** + * CacheWriter class for removing metadata tables when their associated data table is expired + * via cache expiration. + */ + class MetadataTableInvalidatingCacheWriter implements CacheWriter { +@Override +public void write(TableIdentifier tableIdentifier, Table table) { +} + +@Override +public void delete(TableIdentifier tableIdentifier, Table table, RemovalCause cause) { + if (RemovalCause.EXPIRED.equals(cause)) { +if (!MetadataTableUtils.hasMetadataTableName(tableIdentifier)) { + tableCache.invalidateAll(metadataTableIdentifiers(tableIdentifier)); +} Review Comment: This obviously races with hash puts done in `org.apache.iceberg.CachingCatalog#loadTable`. Would be great to have a code comment instructing the reader why it's not a problem. Is the thinking that this is not required for correctness and therefore does not have to be 100% accurate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] AWS: Add FileIO tracker/closer to Glue catalog [iceberg]
findepi commented on code in PR #8315: URL: https://github.com/apache/iceberg/pull/8315#discussion_r1650952000 ## aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java: ## @@ -619,4 +635,17 @@ public void setConf(Configuration conf) { protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } + + private Cache newFileIOCloser() { +return Caffeine.newBuilder() +.weakKeys() +.removalListener( +(RemovalListener) +(ops, fileIO, cause) -> { + if (null != fileIO) { +fileIO.close(); Review Comment: This is not a regular cache, this could be better served with `java.lang.ref.Cleaner`. However, none of these addresses the main problem that we use Java GC as a proxy to discover and close resource leaks. I understand that fileIO instances can hold on to resources that require closing (like file descriptors or sockets inside AWS S3 client). If i have abundance of memory, the weakly held instances may not get collected, so the app can run out of descriptors. Can you please help me understand why this isn't a concern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynConstructors cleanup [iceberg]
nastra commented on code in PR #10542: URL: https://github.com/apache/iceberg/pull/10542#discussion_r1650975795 ## common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.common; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestDynConstructors { + @Test + public void testImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder().impl(MyClass.class).buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +.impl("org.apache.iceberg.common.TestDynConstructors$MyClass") +.buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceWrongImplString() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +// TODO this should throw, since the MyUnrelatedClass does not implement MyInterface Review Comment: still a TODO left here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynConstructors cleanup [iceberg]
nastra commented on code in PR #10542: URL: https://github.com/apache/iceberg/pull/10542#discussion_r1650976468 ## common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.common; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestDynConstructors { + @Test + public void testImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder().impl(MyClass.class).buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +.impl("org.apache.iceberg.common.TestDynConstructors$MyClass") +.buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceWrongImplString() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +// TODO this should throw, since the MyUnrelatedClass does not implement MyInterface + .impl("org.apache.iceberg.common.TestDynConstructors$MyUnrelatedClass") +.buildChecked(); +assertThatThrownBy(ctor::newInstance) +.isInstanceOf(ClassCastException.class) +.hasMessage( +"org.apache.iceberg.common.TestDynConstructors$MyUnrelatedClass cannot be cast to org.apache.iceberg.common.TestDynConstructors$MyInterface"); + } + + @Test + public void testInterfaceWrongImplClass() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +// TODO this should throw or not compile at all, since the MyUnrelatedClass does not Review Comment: TODO left here. Are you planning to address this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynConstructors cleanup [iceberg]
findepi commented on code in PR #10542: URL: https://github.com/apache/iceberg/pull/10542#discussion_r1651007712 ## common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.common; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestDynConstructors { + @Test + public void testImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder().impl(MyClass.class).buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +.impl("org.apache.iceberg.common.TestDynConstructors$MyClass") +.buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceWrongImplString() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +// TODO this should throw, since the MyUnrelatedClass does not implement MyInterface Review Comment: This is reflection on current state of the API, pre-existing problem. I can remove this TODO if this helps. I did not plan to address this as part of this cleanup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [I] Field comments are not written for timestamp field [iceberg]
ariksa1 commented on issue #4212: URL: https://github.com/apache/iceberg/issues/4212#issuecomment-2186546553 Hi, there is any update regarding the issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] DynConstructors cleanup [iceberg]
findepi commented on code in PR #10542: URL: https://github.com/apache/iceberg/pull/10542#discussion_r1651013839 ## common/src/test/java/org/apache/iceberg/common/TestDynConstructors.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.common; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +public class TestDynConstructors { + @Test + public void testImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder().impl(MyClass.class).buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceImplNewInstance() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +.impl("org.apache.iceberg.common.TestDynConstructors$MyClass") +.buildChecked(); +assertThat(ctor.newInstance()).isInstanceOf(MyClass.class); + } + + @Test + public void testInterfaceWrongImplString() throws Exception { +DynConstructors.Ctor ctor = +DynConstructors.builder(MyInterface.class) +// TODO this should throw, since the MyUnrelatedClass does not implement MyInterface Review Comment: BTW my thinking about TODO in the test is this -- a test should prevent undesired behavioral changes of the implementation. If i write a test exercising specific behavior and I don't think the behavior change should be prevented I put a comment like this one. If I don't put the comment in, someone may treat this test as asserting something important and thus give up an otherwise good change. I understand this is a bit convoluted, but that's a lesson learned from Trino on how contributors sometimes react to test failures. I've seen test failures causing otherwise good code to become less good, because the contributor considered modifying the test is not an option. Alternatively, I could not have the test, but I am not convinced this is a better option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651047933 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -370,58 +402,70 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { +throw new CommitFailedException( +"Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", +nextVersion, findVersionWithOutVersionHint(fs)); } - throw cfe; + return renameMetaDataFileAndCheck(fs, src, dst); } finally { if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { +return Util.getFs(path, hadoopConf); + } + + @VisibleForTesting + boolean checkMetaDataFileRenameSuccess( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) throws IOException { +return fs.exists(finalMetaDataFile) && !fs.exists(tempMetaDataFile); + } + + @VisibleForTesting + boolean renameMetaDataFile(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) + throws IOException { +return fs.rename(tempMetaDataFile, finalMetaDataFile); + } + + private boolean renameCheck( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Throwable rootError) { try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; + return checkMetaDataFileRenameSuccess(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error( + "No correctness check can be performed.src=[{}],dst=[{}].", + tempMetaDataFile, + finalMetaDataFile, + e); + String msg = + String.format( + "Exception thrown when renaming [%s] to [%s].Also no correctness check can be performed.", + tempMetaDataFile, finalMetaDataFile); + throw new CommitStateUnknownException(msg, rootError != null ? rootError : e); } } - protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { -return Util.getFs(path, hadoopConf); + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { Review Comment: @Fokko Sir, thanks for pointing out the problem here, I've made this part of the exception handling a bit more detailed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651072649 ## core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java: ## @@ -206,6 +210,139 @@ public void testFailedCommit() throws Exception { Assertions.assertThat(manifests).as("Should contain 0 Avro manifest files").isEmpty(); } + @Test + public void testCommitFailedBeforeChangeVersionHint() throws IOException { +table.newFastAppend().appendFile(FILE_A).commit(); +BaseTable baseTable = (BaseTable) table; +HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); + +HadoopTableOperations spyOps2 = spy(tableOperations); +doReturn(1).when(spyOps2).findVersionWithOutVersionHint(any()); +TableMetadata metadataV1 = spyOps2.current(); +SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); +TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort); +assertThatThrownBy(() -> spyOps2.commit(metadataV1, metadataV2)) +.isInstanceOf(CommitFailedException.class) +.hasMessageContaining("Are there other clients running in parallel with the current task?"); + +HadoopTableOperations spyOps3 = spy(tableOperations); +doReturn(false).when(spyOps3).nextVersionIsLatest(any(), any()); +assertCommitNotChangeVersion( +baseTable, +spyOps3, +CommitFailedException.class, +"Are there other clients running in parallel with the current task?"); + +HadoopTableOperations spyOps4 = spy(tableOperations); +doThrow(new RuntimeException("FileSystem crash!")) +.when(spyOps4) +.renameMetaDataFileAndCheck(any(), any(), any()); +assertCommitNotChangeVersion( +baseTable, spyOps4, CommitFailedException.class, "FileSystem crash!"); + } + + @Test + public void testCommitFailedAndCheckFailed() throws IOException { +table.newFastAppend().appendFile(FILE_A).commit(); +BaseTable baseTable = (BaseTable) table; +HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); +HadoopTableOperations spyOps = spy(tableOperations); +doThrow(new RuntimeException("FileSystem crash!")) Review Comment: @Fokko done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651082514 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { Review Comment: @Fokko For object stores, fs.rename may leave an incomplete dst file. This is because many object stores implement the rename operation as write(). As a result, all of the logic we have implemented in hadooptableOptions is not really applicable to object stores. For filesystems that support atomic operations, the list operation should take an acceptable amount of time if the number of metadata files is not very large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651099384 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { Review Comment: @Fokko Also, this check I admit does not solve the problem in all scenarios at the moment, a better approach would be to call fs.rename successfully and then check that it's commits are normal. This check just intended to fail the client as soon as possible. Because there are a lot of clients running flink/kafka-stream to write to flink at the same time they are running the compaction task. Implementing fast failures is more friendly to them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651106651 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -355,71 +475,90 @@ int findVersion() { * an attempt will be made to delete the source file. * * @param fs the filesystem used for the rename - * @param src the source file - * @param dst the destination file + * @param tempMetaDataFile the source file + * @param finalMetaDataFile the destination file */ - private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { + @VisibleForTesting + boolean commitNewVersion( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer nextVersion) + throws IOException { try { - if (!lockManager.acquire(dst.toString(), src.toString())) { + if (!lockManager.acquire(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { throw new CommitFailedException( -"Failed to acquire lock on file: %s with owner: %s", dst, src); +"Failed to acquire lock on file: %s with owner: %s", +finalMetaDataFile, tempMetaDataFile); } - if (fs.exists(dst)) { -throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); - } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; + if (fs.exists(finalMetaDataFile)) { +throw new CommitFailedException( +"Version %d already exists: %s", nextVersion, finalMetaDataFile); } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + // maybe too heavy.? + if (!nextVersionIsLatest(nextVersion, fs)) { +// In the case of concurrent execution, +// verify that the version that is ready to be committed at a time is the latest version. +throw new CommitFailedException("Version %d too old: %s", nextVersion, finalMetaDataFile); } - throw cfe; + return renameMetaDataFileAndCheck(fs, tempMetaDataFile, finalMetaDataFile); } finally { - if (!lockManager.release(dst.toString(), src.toString())) { -LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + if (!lockManager.release(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { +LOG.warn( +"Failed to release lock on file: {} with owner: {}", +finalMetaDataFile, +tempMetaDataFile); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { -try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; -} + private void cleanUncommittedMeta(Path src) { +io().deleteFile(src.toString()); } protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { return Util.getFs(path, hadoopConf); } + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + // The most important step. There must be no mistakes in this step. + // Even if it does, we should stop everything. + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error("There were some problems with submitting the new version.", e); + try { +if (newMetadataExists(fs, finalMetaDataFile) && !tempMetadataExists(fs, tempMetaDataFile)) { + return true; +} else { + throw new CommitFailedException(e); +} + } catch (CommitFailedException e1) { +throw e1; + } catch (Throwable e2) { +throw new CommitStateUnknownException(e2); + } +} + } + + @VisibleForTesting Review Comment: @Fokko Yes, but in practice many detailed scenarios are difficult to create locally. .. It is difficult to simulate such scenarios. For example: halfway through a method call, simulating all accesses to the filesystem suddenly throws an exception. I don't know how to simulate this scenario. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.ap
Re: [PR] Flink: Maintenance - TriggerManager [iceberg]
stevenzwu commented on code in PR #10484: URL: https://github.com/apache/iceberg/pull/10484#discussion_r1648221004 ## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JVMBasedLockFactory.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.concurrent.Semaphore; +import org.apache.flink.annotation.Internal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The locks are based on static {@link Semaphore} objects. We expect that the {@link + * TriggerManager} and the LockRemover operators will be placed on the same TaskManager (JVM), as + * they are both global. In this case JVM based locking should be enough to allow communication + * between the operators. + */ +@Internal +public class JVMBasedLockFactory implements TriggerLockFactory { Review Comment: nit: JVM seems redundant as this is Java code. maybe `SemaphoreLockFactory` or `InMemorySemaphoreLockFactory`? ## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** . */ +@Internal +class TriggerManager extends KeyedProcessFunction +implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List taskNames; + private final List evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List triggerCounters; + private transient ValueState nextEvaluationTime; + private transient ListState accumulatedChanges; + private transient ListState lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock re
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651099384 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -368,58 +431,63 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; - } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + if (!nextVersionIsLatest(nextVersion, fs)) { Review Comment: @Fokko Also, this check I admit does not solve the problem in all scenarios at the moment, a better approach would be to call fs.rename successfully and then check that it's commits are normal (This is the part of the code I want to submit a PR for later.). This check just intended to fail the client as soon as possible. Because there are a lot of clients running flink/kafka-stream to write to flink at the same time they are running the compaction task. Implementing fast failures is more friendly to them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651106651 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -355,71 +475,90 @@ int findVersion() { * an attempt will be made to delete the source file. * * @param fs the filesystem used for the rename - * @param src the source file - * @param dst the destination file + * @param tempMetaDataFile the source file + * @param finalMetaDataFile the destination file */ - private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { + @VisibleForTesting + boolean commitNewVersion( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer nextVersion) + throws IOException { try { - if (!lockManager.acquire(dst.toString(), src.toString())) { + if (!lockManager.acquire(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { throw new CommitFailedException( -"Failed to acquire lock on file: %s with owner: %s", dst, src); +"Failed to acquire lock on file: %s with owner: %s", +finalMetaDataFile, tempMetaDataFile); } - if (fs.exists(dst)) { -throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); - } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; + if (fs.exists(finalMetaDataFile)) { +throw new CommitFailedException( +"Version %d already exists: %s", nextVersion, finalMetaDataFile); } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + // maybe too heavy.? + if (!nextVersionIsLatest(nextVersion, fs)) { +// In the case of concurrent execution, +// verify that the version that is ready to be committed at a time is the latest version. +throw new CommitFailedException("Version %d too old: %s", nextVersion, finalMetaDataFile); } - throw cfe; + return renameMetaDataFileAndCheck(fs, tempMetaDataFile, finalMetaDataFile); } finally { - if (!lockManager.release(dst.toString(), src.toString())) { -LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + if (!lockManager.release(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { +LOG.warn( +"Failed to release lock on file: {} with owner: {}", +finalMetaDataFile, +tempMetaDataFile); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { -try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; -} + private void cleanUncommittedMeta(Path src) { +io().deleteFile(src.toString()); } protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { return Util.getFs(path, hadoopConf); } + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + // The most important step. There must be no mistakes in this step. + // Even if it does, we should stop everything. + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error("There were some problems with submitting the new version.", e); + try { +if (newMetadataExists(fs, finalMetaDataFile) && !tempMetadataExists(fs, tempMetaDataFile)) { + return true; +} else { + throw new CommitFailedException(e); +} + } catch (CommitFailedException e1) { +throw e1; + } catch (Throwable e2) { +throw new CommitStateUnknownException(e2); + } +} + } + + @VisibleForTesting Review Comment: @Fokko Yes, but in practice many detailed scenarios are difficult to create locally. .. It is difficult to simulate such scenarios. For example: halfway through a method call, simulating all accesses to the filesystem suddenly throws an exception. I don't know how to simulate this scenario. For example, in your example above, we mock a FileSystem object, but there are multiple calls to the FileSystem object in the testRenameWithFileSystem method, and the behaviour of the entire method is different aft
Re: [PR] View Spec implementation [iceberg-rust]
c-thiel commented on code in PR #331: URL: https://github.com/apache/iceberg-rust/pull/331#discussion_r1651128747 ## crates/iceberg/testdata/view_metadata/ViewMetadataV2Valid.json: ## @@ -0,0 +1,58 @@ +{ Review Comment: Ok, thanks for the feedback @nastra. I'll add some more tests next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Core: FileSystemCatalogTable needs to skip file cleanup after task failure under some boundary conditions. [iceberg]
BsoBird commented on code in PR #9546: URL: https://github.com/apache/iceberg/pull/9546#discussion_r1651106651 ## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ## @@ -355,71 +475,90 @@ int findVersion() { * an attempt will be made to delete the source file. * * @param fs the filesystem used for the rename - * @param src the source file - * @param dst the destination file + * @param tempMetaDataFile the source file + * @param finalMetaDataFile the destination file */ - private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { + @VisibleForTesting + boolean commitNewVersion( + FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile, Integer nextVersion) + throws IOException { try { - if (!lockManager.acquire(dst.toString(), src.toString())) { + if (!lockManager.acquire(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { throw new CommitFailedException( -"Failed to acquire lock on file: %s with owner: %s", dst, src); +"Failed to acquire lock on file: %s with owner: %s", +finalMetaDataFile, tempMetaDataFile); } - if (fs.exists(dst)) { -throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); - } - - if (!fs.rename(src, dst)) { -CommitFailedException cfe = -new CommitFailedException("Failed to commit changes using rename: %s", dst); -RuntimeException re = tryDelete(src); -if (re != null) { - cfe.addSuppressed(re); -} -throw cfe; + if (fs.exists(finalMetaDataFile)) { +throw new CommitFailedException( +"Version %d already exists: %s", nextVersion, finalMetaDataFile); } -} catch (IOException e) { - CommitFailedException cfe = - new CommitFailedException(e, "Failed to commit changes using rename: %s", dst); - RuntimeException re = tryDelete(src); - if (re != null) { -cfe.addSuppressed(re); + // maybe too heavy.? + if (!nextVersionIsLatest(nextVersion, fs)) { +// In the case of concurrent execution, +// verify that the version that is ready to be committed at a time is the latest version. +throw new CommitFailedException("Version %d too old: %s", nextVersion, finalMetaDataFile); } - throw cfe; + return renameMetaDataFileAndCheck(fs, tempMetaDataFile, finalMetaDataFile); } finally { - if (!lockManager.release(dst.toString(), src.toString())) { -LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + if (!lockManager.release(finalMetaDataFile.toString(), finalMetaDataFile.toString())) { +LOG.warn( +"Failed to release lock on file: {} with owner: {}", +finalMetaDataFile, +tempMetaDataFile); } } } - /** - * Deletes the file from the file system. Any RuntimeException will be caught and returned. - * - * @param path the file to be deleted. - * @return RuntimeException caught, if any. null otherwise. - */ - private RuntimeException tryDelete(Path path) { -try { - io().deleteFile(path.toString()); - return null; -} catch (RuntimeException re) { - return re; -} + private void cleanUncommittedMeta(Path src) { +io().deleteFile(src.toString()); } protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { return Util.getFs(path, hadoopConf); } + @VisibleForTesting + boolean renameMetaDataFileAndCheck(FileSystem fs, Path tempMetaDataFile, Path finalMetaDataFile) { +try { + // The most important step. There must be no mistakes in this step. + // Even if it does, we should stop everything. + return renameMetaDataFile(fs, tempMetaDataFile, finalMetaDataFile); +} catch (Throwable e) { + LOG.error("There were some problems with submitting the new version.", e); + try { +if (newMetadataExists(fs, finalMetaDataFile) && !tempMetadataExists(fs, tempMetaDataFile)) { + return true; +} else { + throw new CommitFailedException(e); +} + } catch (CommitFailedException e1) { +throw e1; + } catch (Throwable e2) { +throw new CommitStateUnknownException(e2); + } +} + } + + @VisibleForTesting Review Comment: @Fokko Yes, but in practice many detailed scenarios are difficult to create locally. .. It is difficult to simulate such scenarios. For example: halfway through a method call, simulating all accesses to the filesystem suddenly throws an exception. I don't know how to simulate this scenario. For example, in your example above, we mock a FileSystem object, but there are multiple calls to the FileSystem object in the testRenameWithFileSystem method, and each call to the FileSystem object that throws an e