szehon-ho commented on code in PR #9461:
URL: https://github.com/apache/iceberg/pull/9461#discussion_r1467006116


##########
core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java:
##########
@@ -309,65 +300,19 @@ protected enum CommitStatus {
    * @return Commit Status of Success, Failure or Unknown
    */
   protected CommitStatus checkCommitStatus(String newMetadataLocation, 
TableMetadata config) {
-    int maxAttempts =
-        PropertyUtil.propertyAsInt(
-            config.properties(), COMMIT_NUM_STATUS_CHECKS, 
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
-    long minWaitMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
-            COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
-    long maxWaitMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
-            COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
-    long totalRetryMs =
-        PropertyUtil.propertyAsLong(
-            config.properties(),
-            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
-            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);
-
-    AtomicReference<CommitStatus> status = new 
AtomicReference<>(CommitStatus.UNKNOWN);
-
-    Tasks.foreach(newMetadataLocation)
-        .retry(maxAttempts)
-        .suppressFailureWhenFinished()
-        .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
-        .onFailure(
-            (location, checkException) ->
-                LOG.error("Cannot check if commit to {} exists.", tableName(), 
checkException))
-        .run(
-            location -> {
-              TableMetadata metadata = refresh();
-              String currentMetadataFileLocation = 
metadata.metadataFileLocation();
-              boolean commitSuccess =
-                  currentMetadataFileLocation.equals(newMetadataLocation)
-                      || metadata.previousFiles().stream()
-                          .anyMatch(log -> 
log.file().equals(newMetadataLocation));
-              if (commitSuccess) {
-                LOG.info(
-                    "Commit status check: Commit to {} of {} succeeded",
-                    tableName(),
-                    newMetadataLocation);
-                status.set(CommitStatus.SUCCESS);
-              } else {
-                LOG.warn(
-                    "Commit status check: Commit to {} of {} unknown, new 
metadata location is not current "
-                        + "or in history",
-                    tableName(),
-                    newMetadataLocation);
-              }
-            });
-
-    if (status.get() == CommitStatus.UNKNOWN) {
-      LOG.error(
-          "Cannot determine commit state to {}. Failed during checking {} 
times. "
-              + "Treating commit state as unknown.",
-          tableName(),
-          maxAttempts);
-    }
-    return status.get();
+    return MetastoreOperationsUtil.checkCommitStatus(

Review Comment:
   My question here is, does it have to be a Util?
   
   Usually in Iceberg, I notice that we tend to do shared code using abstract 
classes.  For example, a BaseMetastoreOperations that defines the common 
methods, and then have BaseMetastoreTableOperations and then 
BaseMetastoreViewOperations.  That way, we can make those be 'protected'



##########
core/src/main/java/org/apache/iceberg/util/MetastoreOperationsUtil.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreOperationsUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreOperationsUtil.class);
+
+  private MetastoreOperationsUtil() {}
+
+  /**
+   * Attempt to load the table and see if any current or past metadata 
location matches the one we
+   * were attempting to set. This is used as a last resort when we are dealing 
with exceptions that
+   * may indicate the commit has failed but are not proof that this is the 
case. Past locations must
+   * also be searched on the chance that a second committer was able to 
successfully commit on top
+   * of our commit.
+   *
+   * @param tableName full name of the table
+   * @param newMetadataLocation the path of the new commit file
+   * @param properties properties for retry
+   * @param commitStatusSupplier calculate commit status with updated location
+   * @return Commit Status of Success, Failure or Unknown
+   */
+  public static BaseMetastoreTableOperations.CommitStatus checkCommitStatus(
+      String tableName,
+      String newMetadataLocation,
+      Map<String, String> properties,
+      Function<String, Boolean> commitStatusSupplier) {

Review Comment:
   How about, passing in a method to just get the current metadata location(s)? 
   ie, Runnable<List<String>> refreshMetadataLocations
   
   Then the common logic can include the comparison with new metadata location? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to