szehon-ho commented on code in PR #9461:
URL: https://github.com/apache/iceberg/pull/9461#discussion_r1479132215


##########
core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseMetastoreOperations {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseMetastoreOperations.class);
+
+  /**
+   * Attempt to load the table and see if any current or past metadata 
location matches the one we
+   * were attempting to set. This is used as a last resort when we are dealing 
with exceptions that
+   * may indicate the commit has failed but are not proof that this is the 
case. Past locations must
+   * also be searched on the chance that a second committer was able to 
successfully commit on top
+   * of our commit.
+   *
+   * @param tableName full name of the table
+   * @param newMetadataLocation the path of the new commit file
+   * @param properties properties for retry
+   * @param commitStatusSupplier calculate commit status with updated location
+   * @return Commit Status of Success, Failure or Unknown
+   */
+  protected BaseMetastoreTableOperations.CommitStatus checkCommitStatus(
+      String tableName,
+      String newMetadataLocation,
+      Map<String, String> properties,
+      Function<String, Boolean> commitStatusSupplier) {
+    int maxAttempts =
+        PropertyUtil.propertyAsInt(
+            properties, COMMIT_NUM_STATUS_CHECKS, 
COMMIT_NUM_STATUS_CHECKS_DEFAULT);
+    long minWaitMs =
+        PropertyUtil.propertyAsLong(
+            properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, 
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
+    long maxWaitMs =
+        PropertyUtil.propertyAsLong(
+            properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, 
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
+    long totalRetryMs =
+        PropertyUtil.propertyAsLong(
+            properties,
+            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
+            COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);
+
+    AtomicReference<BaseMetastoreTableOperations.CommitStatus> status =
+        new 
AtomicReference<>(BaseMetastoreTableOperations.CommitStatus.UNKNOWN);

Review Comment:
   Can we move CommitStatus to this class, and make it protected?
   
   That way the BaseMetastoreTableOperations should not need to make it public.



##########
core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseMetastoreOperations {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseMetastoreOperations.class);
+
+  /**
+   * Attempt to load the table and see if any current or past metadata 
location matches the one we
+   * were attempting to set. This is used as a last resort when we are dealing 
with exceptions that
+   * may indicate the commit has failed but are not proof that this is the 
case. Past locations must

Review Comment:
   Nit: 'are not proof' => 'don't have proof'
   
   How about 'Past locations must ...' => 'Note that all previous locations 
must ...'



##########
core/src/main/java/org/apache/iceberg/util/MetastoreOperationsUtil.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastoreOperationsUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreOperationsUtil.class);
+
+  private MetastoreOperationsUtil() {}
+
+  /**
+   * Attempt to load the table and see if any current or past metadata 
location matches the one we
+   * were attempting to set. This is used as a last resort when we are dealing 
with exceptions that
+   * may indicate the commit has failed but are not proof that this is the 
case. Past locations must
+   * also be searched on the chance that a second committer was able to 
successfully commit on top
+   * of our commit.
+   *
+   * @param tableName full name of the table
+   * @param newMetadataLocation the path of the new commit file
+   * @param properties properties for retry
+   * @param commitStatusSupplier calculate commit status with updated location
+   * @return Commit Status of Success, Failure or Unknown
+   */
+  public static BaseMetastoreTableOperations.CommitStatus checkCommitStatus(
+      String tableName,
+      String newMetadataLocation,
+      Map<String, String> properties,
+      Function<String, Boolean> commitStatusSupplier) {

Review Comment:
    I meant something like:
   
   ```
   
   checkCommitStatus(String newMetadataLocation, TableMetadata config, 
Runnable<List<String>> loadMetadataLocations)
   
       ... // existing common code
   
       Tasks.foreach(newMetadataLocation)
           .retry(maxAttempts)
           .suppressFailureWhenFinished()
           .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
           .onFailure(
               (location, checkException) ->
                   LOG.error("Cannot check if commit to {} exists.", 
tableName(), checkException))
           .run(
               location -> {
                 List<String> allMetadataLocations = loadMetadataLocations();
                 boolean commitSuccess = 
allMetadataLocations.contains(newMetadataLocations);
   
                 if (commitSuccess) {
                   ...
                    status.set(CommitStatus.SUCCESS);
                 } else {
                    ...
                 }
               });
   
         ... // rest of existing common code
   ```
   
   That way, we can just have the different implementations like:
   
   
   ```
   HiveTableOperations {
      List<String> loadMetadataLocations() {
          TableMetadata metadata = refresh();
          return ImmutableList.Builder
              .add(metadata.metadataFileLocation())
              .addAll(metadata.previousFiles())
              .build();
      }
   }
   
   ViewTableOperations {
         List<String> loadMetadataLocations() {
          ViewMetadata metadata = refresh();
          return ImmutableList.Builder
              .add(metadata.metadataFileLocation())
              .addAll(metadata.previousFiles())
              .build();
      }
   }
   
   ```
   
   and pass this::loadMetadataLocations() into checkCommitStatus() call.  Is 
that clearer?  This allows us to more code, if we dont go with the common 
interface for View and Table hierarchies.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -166,153 +163,58 @@ protected void doRefresh() {
     refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
     boolean newTable = base == null;
     String newMetadataLocation = writeNewMetadataIfRequired(newTable, 
metadata);
-    boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
-    boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, 
false);
-
-    CommitStatus commitStatus = CommitStatus.FAILURE;
-    boolean updateHiveTable = false;
-
     HiveLock lock = lockObject(metadata);

Review Comment:
   Is it possible to put 
   ```
   try {
         HiveLock lock = lockObject();
         ...
    finally {
        lock.unlock()
    }
   ```
   
   in the common method as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to