This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch LeaderExecutor_Feature
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/LeaderExecutor_Feature by this 
push:
     new 38ddeff9aa HDDS-11403. granular lock and cache (#7271)
38ddeff9aa is described below

commit 38ddeff9aa96cc2a08cb001c9735fe2bef6e0a71
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Oct 11 21:48:02 2024 +0530

    HDDS-11403. granular lock and cache (#7271)
---
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |   5 +-
 .../hadoop/hdds/utils/db/cache/NoTableCache.java   | 102 +++++
 .../hadoop/hdds/utils/db/cache/TableCache.java     |   3 +-
 .../hadoop/hdds/utils/db/cache/TestTableCache.java |  27 +-
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |   4 +-
 .../om/snapshot/TestOzoneManagerHASnapshot.java    |   5 +-
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   |   5 -
 .../src/main/proto/OmClientProtocol.proto          |   2 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  54 ++-
 .../hadoop/ozone/om/lock/FSOPrefixLocking.java     | 239 ++++++++++++
 .../apache/hadoop/ozone/om/lock/KeyLocking.java    | 131 +++++++
 .../org/apache/hadoop/ozone/om/lock/OmLockOpr.java | 411 +++++++++++++++++++++
 .../hadoop/ozone/om/lock/OmRequestLockUtils.java   | 225 +++++++++++
 .../om/ratis/execution/BucketQuotaResource.java    |  78 ++++
 .../ratis/execution/FollowerRequestExecutor.java   |  19 +-
 .../om/ratis/execution/LeaderRequestExecutor.java  | 212 ++++++++---
 .../om/ratis/execution/OMBasicStateMachine.java    |  16 +-
 .../hadoop/ozone/om/ratis/execution/OMGateway.java | 166 ++++-----
 .../ratis/execution/OmBucketInfoQuotaTracker.java  | 111 ++++++
 .../ozone/om/ratis/execution/PoolExecutor.java     |  26 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   9 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |   4 +
 .../ozone/om/request/OMPersistDbRequest.java       |  88 ++++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  14 +-
 .../S3MultipartUploadCompleteRequest.java          |  10 -
 .../ozone/om/request/upgrade/OMPrepareRequest.java |  22 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   5 +-
 .../om/service/TestDirectoryDeletingService.java   |   2 +-
 28 files changed, 1750 insertions(+), 245 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index b8c4fe9b0a..f04e45254c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheResult;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.hdds.utils.db.cache.FullTableCache;
+import org.apache.hadoop.hdds.utils.db.cache.NoTableCache;
 import org.apache.hadoop.hdds.utils.db.cache.PartialTableCache;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache;
@@ -125,6 +126,8 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
               CacheValue.get(EPOCH_DEFAULT, kv.getValue()));
         }
       }
+    } else if (cacheType == CacheType.NO_CACHE) {
+      cache = new NoTableCache<>();
     } else {
       cache = new PartialTableCache<>(threadNamePrefix);
     }
@@ -549,7 +552,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
   }
 
   @VisibleForTesting
-  public TableCache<KEY, VALUE> getCache() {
+  TableCache<KEY, VALUE> getCache() {
     return cache;
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/NoTableCache.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/NoTableCache.java
new file mode 100644
index 0000000000..c96c9b0b36
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/NoTableCache.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.utils.db.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
+import org.apache.hadoop.hdds.annotation.InterfaceStability.Evolving;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * No cache type implementation.
+ * @param <KEY>
+ * @param <VALUE>
+ */
+@Private
+@Evolving
+public class NoTableCache<KEY, VALUE> implements TableCache<KEY, VALUE> {
+  public static final Logger LOG = LoggerFactory.getLogger(NoTableCache.class);
+
+  public NoTableCache() {
+  }
+
+  @Override
+  public CacheValue<VALUE> get(CacheKey<KEY> cachekey) {
+    return null;
+  }
+
+  @Override
+  public void loadInitial(CacheKey<KEY> key, CacheValue<VALUE> value) {
+    // Do nothing for partial table cache.
+  }
+
+  @Override
+  public void put(CacheKey<KEY> cacheKey, CacheValue<VALUE> value) {
+  }
+
+  @Override
+  public void cleanup(List<Long> epochs) {
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> iterator() {
+    Map<CacheKey<KEY>, CacheValue<VALUE>> objectObjectMap = 
Collections.emptyMap();
+    return objectObjectMap.entrySet().iterator();
+  }
+
+  @VisibleForTesting
+  @Override
+  public void evictCache(List<Long> epochs) {
+  }
+
+  @Override
+  public CacheResult<VALUE> lookup(CacheKey<KEY> cachekey) {
+    return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST, null);
+  }
+
+  @VisibleForTesting
+  @Override
+  public NavigableMap<Long, Set<CacheKey<KEY>>> getEpochEntries() {
+    return new ConcurrentSkipListMap<>();
+  }
+
+  @Override
+  public CacheStats getStats() {
+    return new CacheStats(0, 0, 0);
+  }
+
+  @Override
+  public CacheType getCacheType() {
+    return CacheType.PARTIAL_CACHE;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
index 62de974eac..c2b9541c8f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
@@ -116,7 +116,8 @@ public interface TableCache<KEY, VALUE> {
   enum CacheType {
     FULL_CACHE, //  This mean's the table maintains full cache. Cache and DB
     // state are same.
-    PARTIAL_CACHE // This is partial table cache, cache state is partial state
+    PARTIAL_CACHE, // This is partial table cache, cache state is partial state
+    NO_CACHE
     // compared to DB state.
   }
 }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
index 3f645f90e7..5c8c313ffd 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
@@ -27,10 +27,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import java.util.stream.Stream;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.event.Level;
 
 
@@ -54,8 +55,14 @@ public class TestTableCache {
     }
   }
 
+  static Stream<TableCache.CacheType> tableCacheTypes() {
+    return Stream.of(
+        TableCache.CacheType.FULL_CACHE,
+        TableCache.CacheType.PARTIAL_CACHE
+    );
+  }
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testPartialTableCache(TableCache.CacheType cacheType) {
 
     createTableCache(cacheType);
@@ -99,7 +106,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testTableCacheWithRenameKey(TableCache.CacheType cacheType) {
 
     createTableCache(cacheType);
@@ -155,7 +162,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testPartialTableCacheWithNotContinuousEntries(
       TableCache.CacheType cacheType) {
 
@@ -206,7 +213,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testPartialTableCacheWithOverrideEntries(
       TableCache.CacheType cacheType) {
 
@@ -277,7 +284,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testPartialTableCacheWithOverrideAndDelete(
       TableCache.CacheType cacheType) {
 
@@ -374,7 +381,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testPartialTableCacheParallel(
       TableCache.CacheType cacheType) throws Exception {
 
@@ -458,7 +465,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testTableCache(TableCache.CacheType cacheType) {
 
     createTableCache(cacheType);
@@ -491,7 +498,7 @@ public class TestTableCache {
 
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testTableCacheWithNonConsecutiveEpochList(
       TableCache.CacheType cacheType) {
 
@@ -562,7 +569,7 @@ public class TestTableCache {
   }
 
   @ParameterizedTest
-  @EnumSource(TableCache.CacheType.class)
+  @MethodSource("tableCacheTypes")
   public void testTableCacheStats(TableCache.CacheType cacheType) {
 
     createTableCache(cacheType);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index 5a83f6dbba..7c911ae3e9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -44,7 +44,7 @@ import com.google.common.base.Preconditions;
 /**
  * A class that encapsulates Bucket Info.
  */
-public final class OmBucketInfo extends WithObjectID implements Auditable, 
CopyObject<OmBucketInfo> {
+public class OmBucketInfo extends WithObjectID implements Auditable, 
CopyObject<OmBucketInfo> {
   private static final Codec<OmBucketInfo> CODEC = new DelegatedCodec<>(
       Proto2Codec.get(BucketInfo.getDefaultInstance()),
       OmBucketInfo::getFromProtobuf,
@@ -110,7 +110,7 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable, CopyO
 
   private String owner;
 
-  private OmBucketInfo(Builder b) {
+  protected OmBucketInfo(Builder b) {
     super(b);
     this.volumeName = b.volumeName;
     this.bucketName = b.bucketName;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
index 12c5d1e61e..447034f256 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
@@ -108,7 +108,6 @@ public class TestOzoneManagerHASnapshot {
 
   // Test snapshot diff when OM restarts in HA OM env.
   @Test
-  @Unhealthy("HDDS-11415 follower cache update")
   public void testSnapshotDiffWhenOmLeaderRestart()
       throws Exception {
     String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(10);
@@ -165,7 +164,6 @@ public class TestOzoneManagerHASnapshot {
   }
 
   @Test
-  @Unhealthy("HDDS-11415 follower cache update")
   public void testSnapshotIdConsistency() throws Exception {
     createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
 
@@ -203,7 +201,6 @@ public class TestOzoneManagerHASnapshot {
    * passed or empty.
    */
   @Test
-  @Unhealthy("HDDS-11415 follower cache update")
   public void testSnapshotNameConsistency() throws Exception {
     store.createSnapshot(volumeName, bucketName, "");
     List<OzoneManager> ozoneManagers = cluster.getOzoneManagersList();
@@ -286,7 +283,7 @@ public class TestOzoneManagerHASnapshot {
    * and purgeSnapshot in same batch.
    */
   @Test
-  @Unhealthy("HDDS-11415 om statemachine change and follower cache update")
+  @Unhealthy("HDDS-11415 om statemachine change, remove")
   public void testKeyAndSnapshotDeletionService() throws IOException, 
InterruptedException, TimeoutException {
     OzoneManager omLeader = cluster.getOMLeader();
     OzoneManager omFollower;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index f253f79935..5d64750714 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -38,7 +38,6 @@ import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantCreateRequest;
 import org.apache.hadoop.ozone.shell.tenant.TenantShell;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
-import org.apache.ozone.test.tag.Unhealthy;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Timeout;
@@ -357,7 +356,6 @@ public class TestOzoneTenantShell {
   }
 
   @Test
-  @Unhealthy("HDDS-11415 follower cache issue validation fail")
   public void testAssignAdmin() throws IOException {
 
     final String tenantName = "devaa";
@@ -413,7 +411,6 @@ public class TestOzoneTenantShell {
    * and revoke user flow.
    */
   @Test
-  @Unhealthy("HDDS-11415 follower cache issue validation fail")
   @SuppressWarnings("methodlength")
   public void testOzoneTenantBasicOperations() throws IOException {
 
@@ -688,7 +685,6 @@ public class TestOzoneTenantShell {
   }
 
   @Test
-  @Unhealthy("HDDS-11415 follower cache issue validation fail")
   public void testListTenantUsers() throws IOException {
     executeHA(tenantShell, new String[] {"--verbose", "create", "tenant1"});
     checkOutput(out, "{\n" +
@@ -769,7 +765,6 @@ public class TestOzoneTenantShell {
   }
 
   @Test
-  @Unhealthy("HDDS-11415 follower cache issue validation fail")
   public void testTenantSetSecret() throws IOException, InterruptedException {
 
     final String tenantName = "tenant-test-set-secret";
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index da99dba59e..09cf20cf4f 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -2235,6 +2235,7 @@ message BucketQuotaCount {
     required int64 diffUsedBytes = 3;
     required int64 diffUsedNamespace = 4;
     required bool supportOldQuota = 5 [default=false];
+    optional uint64 bucketObjectId = 6;
 }
 
 message QuotaRepairResponse {
@@ -2264,6 +2265,7 @@ message OMLockDetailsProto {
 message PersistDbRequest {
     repeated DBTableUpdate tableUpdates = 1;
     repeated int64 index = 2;
+    repeated BucketQuotaCount bucketQuotaCount = 3;
 }
 message DBTableUpdate {
     required string tableName = 1;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 4873a7db49..e46844fb0e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -329,6 +329,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   private SnapshotChainManager snapshotChainManager;
   private final OMPerformanceMetrics perfMetrics;
   private final S3Batcher s3Batcher = new S3SecretBatcher();
+  private boolean isLeaderExecutorEnabled = false;
 
   /**
    * OmMetadataManagerImpl constructor.
@@ -359,6 +360,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     // For test purpose only
     ignorePipelineinKey = conf.getBoolean(
         "ozone.om.ignore.pipeline", Boolean.TRUE);
+    isLeaderExecutorEnabled = 
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+        OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
     start(conf);
   }
 
@@ -370,6 +373,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     this.lock = new OzoneManagerLock(conf);
     this.omEpoch = 0;
     perfMetrics = null;
+    isLeaderExecutorEnabled = 
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+        OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
   }
 
   public static OmMetadataManagerImpl createCheckpointMetadataManager(
@@ -401,6 +406,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
       throws IOException {
     lock = new OmReadOnlyLock();
     omEpoch = 0;
+    isLeaderExecutorEnabled = 
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+        OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
     setStore(loadDB(conf, dir, name, true,
         java.util.Optional.of(Boolean.TRUE), Optional.empty()));
     initializeOmTables(CacheType.PARTIAL_CACHE, false);
@@ -414,6 +421,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     try {
       lock = new OmReadOnlyLock();
       omEpoch = 0;
+      isLeaderExecutorEnabled = 
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+          OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
       String snapshotDir = OMStorage.getOmDbDir(conf) +
           OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR;
       File metaDir = new File(snapshotDir);
@@ -683,9 +692,14 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   protected void initializeOmTables(CacheType cacheType,
                                     boolean addCacheMetrics)
       throws IOException {
+    CacheType defaultCacheType = CacheType.PARTIAL_CACHE;
+    if (isLeaderExecutorEnabled) {
+      // TODO HDDS-11415 to change PARTIAL_CACHE --- NO_CACHE, referring 
PARTIAL_CACHE for testcase success
+      defaultCacheType = CacheType.PARTIAL_CACHE;
+    }
     userTable =
         this.store.getTable(USER_TABLE, String.class,
-            PersistedUserVolumeInfo.class);
+            PersistedUserVolumeInfo.class, defaultCacheType);
     checkTableStatus(userTable, USER_TABLE, addCacheMetrics);
 
     volumeTable =
@@ -699,92 +713,92 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
 
     checkTableStatus(bucketTable, BUCKET_TABLE, addCacheMetrics);
 
-    keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
+    keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class, 
defaultCacheType);
     checkTableStatus(keyTable, KEY_TABLE, addCacheMetrics);
 
     deletedTable = this.store.getTable(DELETED_TABLE, String.class,
-        RepeatedOmKeyInfo.class);
+        RepeatedOmKeyInfo.class, defaultCacheType);
     checkTableStatus(deletedTable, DELETED_TABLE, addCacheMetrics);
 
     openKeyTable =
         this.store.getTable(OPEN_KEY_TABLE, String.class,
-            OmKeyInfo.class);
+            OmKeyInfo.class, defaultCacheType);
     checkTableStatus(openKeyTable, OPEN_KEY_TABLE, addCacheMetrics);
 
     multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
-        String.class, OmMultipartKeyInfo.class);
+        String.class, OmMultipartKeyInfo.class, defaultCacheType);
     checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE, addCacheMetrics);
 
     dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
-        OzoneTokenIdentifier.class, Long.class);
+        OzoneTokenIdentifier.class, Long.class, defaultCacheType);
     checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE, addCacheMetrics);
 
     s3SecretTable = this.store.getTable(S3_SECRET_TABLE, String.class,
-        S3SecretValue.class);
+        S3SecretValue.class, defaultCacheType);
     checkTableStatus(s3SecretTable, S3_SECRET_TABLE, addCacheMetrics);
 
     prefixTable = this.store.getTable(PREFIX_TABLE, String.class,
-        OmPrefixInfo.class);
+        OmPrefixInfo.class, defaultCacheType);
     checkTableStatus(prefixTable, PREFIX_TABLE, addCacheMetrics);
 
     dirTable = this.store.getTable(DIRECTORY_TABLE, String.class,
-            OmDirectoryInfo.class);
+            OmDirectoryInfo.class, defaultCacheType);
     checkTableStatus(dirTable, DIRECTORY_TABLE, addCacheMetrics);
 
     fileTable = this.store.getTable(FILE_TABLE, String.class,
-            OmKeyInfo.class);
+            OmKeyInfo.class, defaultCacheType);
     checkTableStatus(fileTable, FILE_TABLE, addCacheMetrics);
 
     openFileTable = this.store.getTable(OPEN_FILE_TABLE, String.class,
-            OmKeyInfo.class);
+            OmKeyInfo.class, defaultCacheType);
     checkTableStatus(openFileTable, OPEN_FILE_TABLE, addCacheMetrics);
 
     deletedDirTable = this.store.getTable(DELETED_DIR_TABLE, String.class,
-        OmKeyInfo.class);
+        OmKeyInfo.class, defaultCacheType);
     checkTableStatus(deletedDirTable, DELETED_DIR_TABLE, addCacheMetrics);
 
     transactionInfoTable = this.store.getTable(TRANSACTION_INFO_TABLE,
-        String.class, TransactionInfo.class);
+        String.class, TransactionInfo.class, defaultCacheType);
     checkTableStatus(transactionInfoTable, TRANSACTION_INFO_TABLE,
         addCacheMetrics);
 
-    metaTable = this.store.getTable(META_TABLE, String.class, String.class);
+    metaTable = this.store.getTable(META_TABLE, String.class, String.class, 
defaultCacheType);
     checkTableStatus(metaTable, META_TABLE, addCacheMetrics);
 
     // accessId -> OmDBAccessIdInfo (tenantId, secret, Kerberos principal)
     tenantAccessIdTable = this.store.getTable(TENANT_ACCESS_ID_TABLE,
-        String.class, OmDBAccessIdInfo.class);
+        String.class, OmDBAccessIdInfo.class, defaultCacheType);
     checkTableStatus(tenantAccessIdTable, TENANT_ACCESS_ID_TABLE,
         addCacheMetrics);
 
     // User principal -> OmDBUserPrincipalInfo (a list of accessIds)
     principalToAccessIdsTable = this.store.getTable(
         PRINCIPAL_TO_ACCESS_IDS_TABLE,
-        String.class, OmDBUserPrincipalInfo.class);
+        String.class, OmDBUserPrincipalInfo.class, defaultCacheType);
     checkTableStatus(principalToAccessIdsTable, PRINCIPAL_TO_ACCESS_IDS_TABLE,
         addCacheMetrics);
 
     // tenant name -> tenant (tenant states)
     tenantStateTable = this.store.getTable(TENANT_STATE_TABLE,
-        String.class, OmDBTenantState.class);
+        String.class, OmDBTenantState.class, defaultCacheType);
     checkTableStatus(tenantStateTable, TENANT_STATE_TABLE, addCacheMetrics);
 
     // TODO: [SNAPSHOT] Consider FULL_CACHE for snapshotInfoTable since
     //  exclusiveSize in SnapshotInfo can be frequently updated.
     // path -> snapshotInfo (snapshot info for snapshot)
     snapshotInfoTable = this.store.getTable(SNAPSHOT_INFO_TABLE,
-        String.class, SnapshotInfo.class);
+        String.class, SnapshotInfo.class, defaultCacheType);
     checkTableStatus(snapshotInfoTable, SNAPSHOT_INFO_TABLE, addCacheMetrics);
 
     // volumeName/bucketName/objectID -> renamedKey or renamedDir
     snapshotRenamedTable = this.store.getTable(SNAPSHOT_RENAMED_TABLE,
-        String.class, String.class);
+        String.class, String.class, defaultCacheType);
     checkTableStatus(snapshotRenamedTable, SNAPSHOT_RENAMED_TABLE,
         addCacheMetrics);
     // TODO: [SNAPSHOT] Initialize table lock for snapshotRenamedTable.
 
     compactionLogTable = this.store.getTable(COMPACTION_LOG_TABLE,
-        String.class, CompactionLogEntry.class);
+        String.class, CompactionLogEntry.class, defaultCacheType);
     checkTableStatus(compactionLogTable, COMPACTION_LOG_TABLE,
         addCacheMetrics);
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/FSOPrefixLocking.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/FSOPrefixLocking.java
new file mode 100644
index 0000000000..a3fd8dbd35
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/FSOPrefixLocking.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+
+/**
+ * Prefix locking for FSO.
+ */
+public class FSOPrefixLocking {
+  private static final int LRU_CACH_MIN_SIZE = 1000;
+  private static final int LRU_TRY_REMOVE_PERCENT = 10;
+  private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
+  private final ScheduledExecutorService executorService;
+  private final AtomicLong writeLockCnt = new AtomicLong();
+  private final AtomicLong readLockCnt = new AtomicLong();
+
+  private LockInfo lockInfo = new LockInfo(null, null);
+  private NavigableSet<LockInfo> lru = new ConcurrentSkipListSet<>();
+
+  public FSOPrefixLocking(String threadPrefix) {
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat(threadPrefix + "FullTableCache-Cleanup-%d").build();
+    executorService = Executors.newScheduledThreadPool(1, threadFactory);
+    executorService.scheduleWithFixedDelay(() -> cleanupTask(), 1000L, 1000L, 
TimeUnit.MILLISECONDS);
+  }
+  
+  public void stop() {
+    executorService.shutdown();
+  }
+  
+  private void cleanupTask() {
+    // most recent will be leaf node
+    int size = lru.size();
+    if (size > LRU_CACH_MIN_SIZE) {
+      int count = size / LRU_TRY_REMOVE_PERCENT;
+      for (int i = 0; i < count; ++i) {
+        LockInfo tmpLockInfo = lru.pollLast();
+        // try remove, if not successful, add back
+        try {
+          boolean b = tmpLockInfo.removeSelf();
+          if (!b) {
+            lru.add(tmpLockInfo);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+  
+  public List<LockInfo> readLock(List<String> keyElements) throws OMException {
+    List<LockInfo> acquiredLocks = new ArrayList<>(keyElements.size());
+    LockInfo tmpLockInfo = lockInfo;
+    for (String path : keyElements) {
+      tmpLockInfo = tmpLockInfo.addPath(path);
+      tmpLockInfo.readLock();
+      acquiredLocks.add(tmpLockInfo);
+      readLockCnt.incrementAndGet();
+    }
+    return acquiredLocks;
+  }
+
+  public void readUnlock(List<LockInfo> acquiredLocks) {
+    ListIterator<LockInfo> li = 
acquiredLocks.listIterator(acquiredLocks.size());
+    while (li.hasPrevious()) {
+      LockInfo previous = li.previous();
+      previous.readUnlock();
+      updateLru(previous);
+      readLockCnt.decrementAndGet();
+    }
+  }
+
+  public LockInfo writeLock(LockInfo prvLockInfo, String name) throws 
IOException {
+    if (null == prvLockInfo) {
+      prvLockInfo = lockInfo;
+    }
+    prvLockInfo = prvLockInfo.addPath(name);
+    prvLockInfo.writeLock();
+    writeLockCnt.incrementAndGet();
+    return prvLockInfo;
+  }
+
+  public void writeUnlock(LockInfo curLockInfo) {
+    curLockInfo.writeUnlock();
+    updateLru(curLockInfo);
+    writeLockCnt.decrementAndGet();
+  }
+
+  private void updateLru(LockInfo previous) {
+    // update is done in unlock as this is more applicable for lru cleanup as 
lock is released
+    lru.remove(previous);
+    lru.add(previous);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Lock status: Write lock count %d, Read lock Count 
%d", writeLockCnt.get(), readLockCnt.get());
+  }
+  /**
+   * lock info about lock tree.
+   */
+  public static class LockInfo implements Comparable {
+    private String key;
+    private Map<String, LockInfo> lockMap = new ConcurrentHashMap<>();
+    private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+    private LockInfo parent;
+    public LockInfo(LockInfo p, String k) {
+      parent = p;
+      this.key = k;
+    }
+    public LockInfo get(String path) {
+      return lockMap.get(path);
+    }
+    public LockInfo addPath(String path) {
+      // concurrentHashMap computeIfAbsent is atomic
+      LockInfo lockInfo = lockMap.computeIfAbsent(path, k -> new 
LockInfo(this, k));
+      return lockInfo;
+    }
+    public void readLock() throws OMException {
+      try {
+        boolean b = rwLock.readLock().tryLock(LOCK_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        if (!b) {
+          throw new OMException("Unable to get read lock for " + key + " after 
" + LOCK_TIMEOUT + "ms",
+              OMException.ResultCodes.TIMEOUT);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    public void readUnlock() {
+      rwLock.readLock().unlock();
+    }
+    public void writeLock() throws IOException {
+      try {
+        boolean b = writeLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
+        if (!b) {
+          throw new OMException("Unable to get write lock for " + key + " 
after " + LOCK_TIMEOUT + "ms",
+              OMException.ResultCodes.TIMEOUT);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    public void writeUnlock() {
+      rwLock.writeLock().unlock();
+    }
+    private boolean writeLock(long time, TimeUnit timeUnit) throws 
InterruptedException {
+      return rwLock.writeLock().tryLock(time, timeUnit);
+    }
+    private void remove(String path) {
+      lockMap.remove(path);
+    }
+    public boolean removeSelf() throws InterruptedException {
+      boolean selfLock = false;
+      boolean parentLock = false;
+      try {
+        selfLock = writeLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
+        if (selfLock) {
+          parentLock = parent.writeLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
+          if (parentLock) {
+            parent.remove(key);
+          }
+        }
+      } finally {
+        if (selfLock) {
+          writeUnlock();
+        }
+        if (parentLock) {
+          parent.writeUnlock();
+        }
+      }
+      return parentLock;
+    }
+
+    public LockInfo getParent() {
+      return parent;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof LockInfo)) {
+        return false;
+      }
+      if (this == other) {
+        return true;
+      }
+      return this.compareTo(other) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override
+    public int compareTo(Object o) {
+      if (!(o instanceof LockInfo)) {
+        return -1;
+      }
+      int rst = Integer.compare(parent.hashCode(), ((LockInfo) 
o).parent.hashCode());
+      if (rst == 0) {
+        return key.compareTo(((LockInfo) o).key);
+      }
+      return rst;
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java
new file mode 100644
index 0000000000..ad6ddc6dc1
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.util.concurrent.Striped;
+import java.io.IOException;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * key locking.
+ */
+public class KeyLocking {
+  private static final Logger LOG = LoggerFactory.getLogger(KeyLocking.class);
+  private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 10240;
+  private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
+  private Striped<ReadWriteLock> fileStripedLock = 
Striped.readWriteLock(DEFAULT_FILE_LOCK_STRIPED_SIZE);
+  private AtomicLong writeLockCount = new AtomicLong();
+  private AtomicLong readLockCount = new AtomicLong();
+  private AtomicLong failedLockCount = new AtomicLong();
+  private AtomicLong failedUnlockCount = new AtomicLong();
+
+  public void lock(List<String> keyList) throws IOException {
+    for (String key : keyList) {
+      lock(key);
+    }
+  }
+
+  public void unlock(List<String> keyList) {
+    ListIterator<String> itr = keyList.listIterator(keyList.size());
+    while (itr.hasPrevious()) {
+      unlock(itr.previous());
+    }
+  }
+
+  public void lock(String key) throws IOException {
+    LOG.debug("Key {} is locked for instance {} {}", key, this, 
fileStripedLock.get(key));
+    try {
+      boolean b = fileStripedLock.get(key).writeLock().tryLock(LOCK_TIMEOUT, 
TimeUnit.MILLISECONDS);
+      if (!b) {
+        LOG.error("Key {} lock is failed after wait of {}ms", key, 
LOCK_TIMEOUT);
+        failedLockCount.incrementAndGet();
+        throw new OMException("Unable to get write lock for " + key + " after 
" + LOCK_TIMEOUT + "ms"
+            + ", lock info: " + fileStripedLock.get(key).readLock(),
+            OMException.ResultCodes.TIMEOUT);
+      }
+      writeLockCount.incrementAndGet();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void unlock(String key) {
+    LOG.debug("Key {} is un-locked for instance {} {}", key, this, 
fileStripedLock.get(key));
+    try {
+      fileStripedLock.get(key).writeLock().unlock();
+    } catch (Throwable th) {
+      LOG.error("Key {} un-lock is failed", key, th);
+      failedUnlockCount.incrementAndGet();
+      throw th;
+    }
+    writeLockCount.decrementAndGet();
+  }
+
+  public void readLock(List<String> keyList) throws OMException {
+    for (String key : keyList) {
+      readLock(key);
+    }
+  }
+
+  public void readUnlock(List<String> keyList) {
+    ListIterator<String> itr = keyList.listIterator(keyList.size());
+    while (itr.hasPrevious()) {
+      readUnlock(itr.previous());
+    }
+  }
+  public void readLock(String key) throws OMException {
+    try {
+      LOG.debug("Key {} is read locked for instance {} {}", key, this, 
fileStripedLock.get(key));
+      boolean b = fileStripedLock.get(key).readLock().tryLock(LOCK_TIMEOUT, 
TimeUnit.MILLISECONDS);
+      if (!b) {
+        failedLockCount.incrementAndGet();
+        throw new OMException("Unable to get read lock for " + key + " after " 
+ LOCK_TIMEOUT + "ms",
+            OMException.ResultCodes.TIMEOUT);
+      }
+      readLockCount.incrementAndGet();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void readUnlock(String key) {
+    try {
+      LOG.debug("Key {} is read un-locked for instance {} {}", key, this, 
fileStripedLock.get(key));
+      fileStripedLock.get(key).readLock().unlock();
+    } catch (Throwable th) {
+      LOG.error("Key {} read un-lock is failed", key, th);
+      failedUnlockCount.incrementAndGet();
+      throw th;
+    }
+    readLockCount.decrementAndGet();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Lock status: Write lock count %d, Read lock Count 
%d, failed lock count %d" +
+        ", failed un-lock count %d", writeLockCount.get(), 
readLockCount.get(), failedLockCount.get(),
+        failedUnlockCount.get());
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java
new file mode 100644
index 0000000000..8ce7855bc5
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * key locking.
+ */
+public class OmLockOpr {
+  private static final Logger LOG = LoggerFactory.getLogger(OmLockOpr.class);
+  private static final long MONITOR_DELAY = 10 * 60 * 1000;
+  private static KeyLocking keyLocking;
+  // volume locking needs separate as different lock key type can have same 
name
+  private static KeyLocking volumeLocking;
+  private static KeyLocking bucketLocking;
+  private static KeyLocking snapshotLocking;
+  private static FSOPrefixLocking prefixLocking;
+  private static Map<OmLockOpr, OmLockOpr> lockedObjMap;
+  private static ScheduledExecutorService executorService;
+
+  private LockType type;
+  private String volumeName;
+  private String bucketName;
+  private String keyName;
+  private List<String> keyNameList;
+  private List<List<FSOPrefixLocking.LockInfo>> lockedKeyFsoList = null;
+  private List<String> lockedParentList = null;
+  private OMLockDetails lockDetails = new OMLockDetails();
+  private long lockTakenTime = 0;
+
+  public OmLockOpr() {
+    this.type = LockType.NONE;
+  }
+
+  public OmLockOpr(LockType type, String keyName) {
+    assert (type == LockType.W_VOLUME || type == LockType.R_VOLUME || type == 
LockType.W_BUCKET
+        || type == LockType.WRITE);
+    this.type = type;
+    this.keyName = keyName;
+  }
+  public OmLockOpr(LockType type, String volumeName, String bucketName) {
+    assert (type == LockType.RW_VOLUME_BUCKET);
+    this.type = type;
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+  }
+  public OmLockOpr(LockType type, String volume, String bucket, String 
keyName) {
+    this.type = type;
+    this.volumeName = volume;
+    this.bucketName = bucket;
+    this.keyName = keyName;
+  }
+
+  public OmLockOpr(LockType type, String volume, String bucket, List<String> 
keyNameList) {
+    this.type = type;
+    this.volumeName = volume;
+    this.bucketName = bucket;
+    this.keyNameList = new ArrayList<>(keyNameList);
+    Collections.sort(this.keyNameList);
+  }
+  public static void init(String threadNamePrefix) {
+    keyLocking = new KeyLocking();
+    volumeLocking = new KeyLocking();
+    snapshotLocking = new KeyLocking();
+    bucketLocking = new KeyLocking();
+    prefixLocking = new FSOPrefixLocking(threadNamePrefix);
+    lockedObjMap = new ConcurrentHashMap<>();
+    // init scheduler to check and monitor
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat(threadNamePrefix + "OmLockOpr-Monitor-%d").build();
+    executorService = Executors.newScheduledThreadPool(1, threadFactory);
+    executorService.scheduleWithFixedDelay(() -> monitor(), 0, MONITOR_DELAY, 
TimeUnit.MILLISECONDS);
+  }
+  
+  public static void stop() {
+    executorService.shutdown();
+    prefixLocking.stop();
+  }
+
+  public static void monitor() {
+    LOG.info("FSO: " + prefixLocking.toString());
+    LOG.info("Volume: " + volumeLocking.toString());
+    LOG.info("Snapshot: " + snapshotLocking.toString());
+    LOG.info("Key: " + keyLocking.toString());
+    LOG.info("Key: " + bucketLocking.toString());
+    LOG.info("Lock operation Status crossing threshold (10 minutes):");
+    long startTime = Time.monotonicNowNanos();
+    for (Map.Entry<OmLockOpr, OmLockOpr> entry : lockedObjMap.entrySet()) {
+      if ((startTime - entry.getKey().getLockTakenTime()) > MONITOR_DELAY) {
+        LOG.info("Lock is hold for {}", entry.getKey());
+      }
+    }
+  }
+
+  private long getLockTakenTime() {
+    return lockTakenTime;
+  }
+
+  public void lock(OzoneManager om) throws IOException {
+    long startTime, endTime;
+    switch (type) {
+    case W_FSO:
+      startTime = Time.monotonicNowNanos();
+      bucketLocking.readLock(bucketName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      lockedKeyFsoList = new ArrayList<>();
+      if (keyName != null) {
+        lockedKeyFsoList.add(lockFso(om.getMetadataManager(), volumeName, 
bucketName, keyName, lockDetails));
+      } else {
+        for (String key : keyNameList) {
+          lockedKeyFsoList.add(lockFso(om.getMetadataManager(), volumeName, 
bucketName, key, lockDetails));
+        }
+      }
+      break;
+    case W_OBS:
+      startTime = Time.monotonicNowNanos();
+      bucketLocking.readLock(bucketName);
+      if (null != keyName) {
+        keyLocking.lock(keyName);
+      } else {
+        keyLocking.lock(keyNameList);
+      }
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case W_LEGACY_FSO:
+      startTime = Time.monotonicNowNanos();
+      bucketLocking.readLock(bucketName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      lockedParentList = getMissingParentPathList(om.getMetadataManager(), 
volumeName, bucketName, keyName);
+      startTime = Time.monotonicNowNanos();
+      keyLocking.lock(lockedParentList);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case SNAPSHOT:
+      startTime = Time.monotonicNowNanos();
+      bucketLocking.readLock(bucketName);
+      if (keyName != null) {
+        snapshotLocking.lock(keyName);
+      } else {
+        snapshotLocking.lock(keyNameList);
+      }
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case RW_VOLUME_BUCKET:
+      startTime = Time.monotonicNowNanos();
+      volumeLocking.readLock(volumeName);
+      bucketLocking.lock(bucketName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case W_VOLUME:
+      startTime = Time.monotonicNowNanos();
+      volumeLocking.lock(keyName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case R_VOLUME:
+      startTime = Time.monotonicNowNanos();
+      volumeLocking.readLock(keyName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case W_BUCKET:
+      startTime = Time.monotonicNowNanos();
+      bucketLocking.lock(keyName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case WRITE:
+      startTime = Time.monotonicNowNanos();
+      keyLocking.lock(keyName);
+      endTime = Time.monotonicNowNanos();
+      lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+      break;
+    case NONE:
+      break;
+    default:
+      LOG.error("Invalid Lock Operation type {}", type);
+    }
+    lockTakenTime = Time.monotonicNowNanos();
+    lockDetails.setLockAcquired(true);
+    lockedObjMap.put(this, this);
+  }
+
+  public void unlock() {
+    if (!lockDetails.isLockAcquired()) {
+      return;
+    }
+    lockDetails.setLockAcquired(false);
+    switch (type) {
+    case W_FSO:
+      for (List<FSOPrefixLocking.LockInfo> lockedFsoList : lockedKeyFsoList) {
+        unlockFso(lockedFsoList);
+      }
+      bucketLocking.readUnlock(bucketName);
+      break;
+    case W_OBS:
+      if (null != keyName) {
+        keyLocking.unlock(keyName);
+      } else {
+        keyLocking.unlock(keyNameList);
+      }
+      bucketLocking.readUnlock(bucketName);
+      break;
+    case W_LEGACY_FSO:
+      keyLocking.unlock(lockedParentList);
+      bucketLocking.readUnlock(bucketName);
+      break;
+    case SNAPSHOT:
+      if (keyName != null) {
+        snapshotLocking.unlock(keyName);
+      } else {
+        snapshotLocking.unlock(keyNameList);
+      }
+      bucketLocking.readUnlock(bucketName);
+      break;
+    case RW_VOLUME_BUCKET:
+      bucketLocking.unlock(bucketName);
+      volumeLocking.readUnlock(volumeName);
+      break;
+    case W_VOLUME:
+      volumeLocking.unlock(keyName);
+      break;
+    case R_VOLUME:
+      volumeLocking.readUnlock(keyName);
+      break;
+    case W_BUCKET:
+      bucketLocking.unlock(keyName);
+      break;
+    case WRITE:
+      keyLocking.unlock(keyName);
+      break;
+    case NONE:
+      break;
+    default:
+      LOG.error("Invalid un-lock Operation type {}", type);
+    }
+    if (lockTakenTime > 0) {
+      if (type == LockType.R_VOLUME) {
+        lockDetails.add(Time.monotonicNowNanos() - lockTakenTime, 
OMLockDetails.LockOpType.READ);
+      } else {
+        lockDetails.add(Time.monotonicNowNanos() - lockTakenTime, 
OMLockDetails.LockOpType.WRITE);
+      }
+    }
+    lockedObjMap.remove(this);
+  }
+  private static List<FSOPrefixLocking.LockInfo> lockFso(
+      OMMetadataManager omMetadataManager, String volumeName, String 
bucketName, String keyName,
+      OMLockDetails lockDetails) throws IOException {
+    List<String> pathList = getPathList(keyName);
+    List<String> dirList = pathList.subList(0, pathList.size() - 1);
+    long startTime = Time.monotonicNowNanos();
+    List<FSOPrefixLocking.LockInfo> locks = prefixLocking.readLock(dirList);
+    long endTime = Time.monotonicNowNanos();
+    lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+    // validate for path exist
+    final long volumeId = omMetadataManager.getVolumeId(volumeName);
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo = 
omMetadataManager.getBucketTable().get(bucketKey);
+    final long bucketId = omBucketInfo.getObjectID();
+    long parentId = omBucketInfo.getObjectID();
+    int splitIndex = -1;
+    for (int i = 0; i < dirList.size(); ++i) {
+      String dbNodeName = omMetadataManager.getOzonePathKey(volumeId, 
bucketId, parentId, dirList.get(i));
+      OmDirectoryInfo omDirInfo = 
omMetadataManager.getDirectoryTable().get(dbNodeName);
+      if (omDirInfo != null) {
+        parentId = omDirInfo.getObjectID();
+        continue;
+      }
+      if 
(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED).isExist(dbNodeName))
 {
+        // do not support directory as file in path, take write lock from that 
point as dir is non-existing
+        // and can not traverse more deep
+        LOG.debug("dir as file {} exist, take that as split to take write lock 
for path {}", dbNodeName, keyName);
+      }
+      splitIndex = i;
+      break;
+    }
+    
+    // update lock with write
+    startTime = Time.monotonicNowNanos();
+    if (splitIndex == 0) {
+      prefixLocking.readUnlock(locks);
+      locks.clear();
+      locks.add(prefixLocking.writeLock(null, dirList.get(splitIndex)));
+    } else if (splitIndex != -1) {
+      List<FSOPrefixLocking.LockInfo> releaseLocks = locks.subList(splitIndex, 
dirList.size());
+      prefixLocking.readUnlock(releaseLocks);
+      locks = locks.subList(0, splitIndex);
+      locks.add(prefixLocking.writeLock(locks.get(splitIndex - 1), 
dirList.get(splitIndex)));
+    } else {
+      String leafNode = pathList.get(pathList.size() - 1);
+      if (locks.isEmpty()) {
+        locks.add(prefixLocking.writeLock(null, leafNode));
+      } else {
+        locks.add(prefixLocking.writeLock(locks.get(locks.size() - 1), 
leafNode));
+      }
+    }
+    endTime = Time.monotonicNowNanos();
+    lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+    return locks;
+  }
+
+  private static void unlockFso(List<FSOPrefixLocking.LockInfo> acquiredLocks) 
{
+    if (acquiredLocks.size() == 0) {
+      return;
+    }
+    prefixLocking.writeUnlock(acquiredLocks.get(acquiredLocks.size() - 1));
+    if (acquiredLocks.size() > 1) {
+      prefixLocking.readUnlock(acquiredLocks.subList(0, acquiredLocks.size() - 
1));
+    }
+  }
+
+  private static List<String> getPathList(String fullPath) {
+    List<String> pathList = new ArrayList<>();
+    Path path = Paths.get(fullPath);
+    Iterator<Path> elements = path.iterator();
+    while (elements.hasNext()) {
+      pathList.add(elements.next().toString());
+    }
+    return pathList;
+  }
+
+  private static List<String> getMissingParentPathList(
+      OMMetadataManager omMetadataManager, String volumeName, String 
bucketName, String fullPath) throws IOException {
+    List<String> pathList = new ArrayList<>();
+    Path path = Paths.get(fullPath);
+    while (path != null) {
+      String pathName = path.toString();
+      String dbKeyName = omMetadataManager.getOzoneKey(volumeName, bucketName, 
pathName);
+      String dbDirKeyName = omMetadataManager.getOzoneDirKey(volumeName, 
bucketName, pathName);
+      if 
(!omMetadataManager.getKeyTable(BucketLayout.LEGACY).isExist(dbKeyName)
+          && 
!omMetadataManager.getKeyTable(BucketLayout.LEGACY).isExist(dbDirKeyName)) {
+        pathList.add(pathName);
+      }
+      path = path.getParent();
+    }
+    return pathList;
+  }
+
+  public OMLockDetails getLockDetails() {
+    return lockDetails;
+  }
+
+  public String toString() {
+    return String.format("type %s, volumeName %s, bucketName %s, keyName %s, 
keyNameList %s",
+        type.name(), volumeName, bucketName, keyName, keyNameList);
+  }
+  /**
+   * lock operatio type.
+   */
+  public enum LockType {
+    NONE,
+    // Lock as per FSO structure with last element missing write lock, 
otherwise read lock
+    // volume name, bucket name and either keyName or keyNameList is mandatory
+    W_FSO,
+    // Lock as per OBS, all elements with write lock
+    W_OBS,
+    // Lock specific Legacy FSO identifying missing path and having write lock
+    W_LEGACY_FSO,
+    W_VOLUME,
+    R_VOLUME,
+    W_BUCKET,
+    WRITE,
+    // lock volume as read, but bucket as write lock
+    RW_VOLUME_BUCKET,
+    // separate bucket of snapshot locking with snapshot name write lock
+    SNAPSHOT
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestLockUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestLockUtils.java
new file mode 100644
index 0000000000..196e7580bc
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestLockUtils.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.lock.OmLockOpr.LockType;
+import org.apache.hadoop.ozone.om.request.util.ObjectParser;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+
+/**
+ * Map request lock for lock and unlock.
+ */
+public final class OmRequestLockUtils {
+  private static Map<OzoneManagerProtocolProtos.Type,
+      BiFunction<OzoneManager, OzoneManagerProtocolProtos.OMRequest, 
OmLockOpr>> reqLockMap = new HashMap<>();
+  private static final OmLockOpr NO_LOCK = new OmLockOpr();
+  private static final OmLockOpr EXP_LOCK = new OmLockOpr();
+
+  private OmRequestLockUtils() {
+  }
+
+  public static void init() {
+    reqLockMap.put(Type.CreateVolume, (om, req) ->
+        new OmLockOpr(LockType.W_VOLUME, 
req.getCreateVolumeRequest().getVolumeInfo().getVolume()));
+    reqLockMap.put(Type.SetVolumeProperty, (om, req) ->
+        new OmLockOpr(LockType.R_VOLUME, 
req.getSetVolumePropertyRequest().getVolumeName()));
+    reqLockMap.put(Type.DeleteVolume, (om, req) ->
+        new OmLockOpr(LockType.W_VOLUME, 
req.getDeleteVolumeRequest().getVolumeName()));
+    reqLockMap.put(Type.CreateBucket, (om, req) -> new 
OmLockOpr(LockType.RW_VOLUME_BUCKET,
+        req.getCreateBucketRequest().getBucketInfo().getVolumeName(),
+        req.getCreateBucketRequest().getBucketInfo().getBucketName()));
+    reqLockMap.put(Type.DeleteBucket, (om, req) ->
+        new OmLockOpr(LockType.W_BUCKET, 
req.getDeleteBucketRequest().getBucketName()));
+    reqLockMap.put(Type.SetBucketProperty, (om, req) ->
+        new OmLockOpr(LockType.W_BUCKET, 
req.getSetBucketPropertyRequest().getBucketArgs().getBucketName()));
+    reqLockMap.put(Type.AddAcl, (om, req) -> aclLockInfo(om, 
req.getAddAclRequest().getObj()));
+    reqLockMap.put(Type.RemoveAcl, (om, req) -> aclLockInfo(om, 
req.getAddAclRequest().getObj()));
+    reqLockMap.put(Type.SetAcl, (om, req) -> aclLockInfo(om, 
req.getAddAclRequest().getObj()));
+    reqLockMap.put(Type.GetS3Secret, (om, req) -> new OmLockOpr(LockType.WRITE,
+        req.getGetS3SecretRequest().getKerberosID()));
+    reqLockMap.put(Type.SetS3Secret, (om, req) -> new OmLockOpr(LockType.WRITE,
+        req.getSetS3SecretRequest().getAccessId()));
+    reqLockMap.put(Type.RevokeS3Secret, (om, req) -> new 
OmLockOpr(LockType.WRITE,
+        req.getRevokeS3SecretRequest().getKerberosID()));
+    reqLockMap.put(Type.GetDelegationToken, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.CancelDelegationToken, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.RenewDelegationToken, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.FinalizeUpgrade, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.Prepare, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.CancelPrepare, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.SetRangerServiceVersion, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.EchoRPC, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.QuotaRepair, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.AbortExpiredMultiPartUploads, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.PurgeKeys, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.PurgeDirectories, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.SnapshotMoveDeletedKeys, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.SnapshotMoveTableKeys, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.SnapshotPurge, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.SetSnapshotProperty, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.DeleteOpenKeys, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.PersistDb, (om, req) -> NO_LOCK);
+    reqLockMap.put(Type.CreateTenant, (om, req) ->
+        new OmLockOpr(LockType.W_VOLUME, 
req.getCreateTenantRequest().getVolumeName()));
+    reqLockMap.put(Type.DeleteTenant, (om, req) ->
+        getTenantLock(om, LockType.W_VOLUME, 
req.getDeleteTenantRequest().getTenantId()));
+    reqLockMap.put(Type.TenantAssignUserAccessId, (om, req) ->
+        getTenantLock(om, LockType.R_VOLUME, 
req.getTenantAssignUserAccessIdRequest().getTenantId()));
+    reqLockMap.put(Type.TenantRevokeUserAccessId, (om, req) ->
+        getTenantLock(om, LockType.R_VOLUME, 
req.getTenantRevokeUserAccessIdRequest().getTenantId()));
+    reqLockMap.put(Type.TenantAssignAdmin, (om, req) ->
+        getTenantLock(om, LockType.R_VOLUME, 
req.getTenantAssignAdminRequest().getTenantId()));
+    reqLockMap.put(Type.TenantRevokeAdmin, (om, req) ->
+        getTenantLock(om, LockType.R_VOLUME, 
req.getTenantRevokeAdminRequest().getTenantId()));
+    reqLockMap.put(Type.CreateSnapshot, (om, req) -> new 
OmLockOpr(LockType.SNAPSHOT,
+        req.getCreateSnapshotRequest().getVolumeName(), 
req.getCreateSnapshotRequest().getBucketName(),
+        req.getCreateSnapshotRequest().getSnapshotName()));
+    reqLockMap.put(Type.DeleteSnapshot, (om, req) -> new 
OmLockOpr(LockType.SNAPSHOT,
+        req.getDeleteSnapshotRequest().getVolumeName(), 
req.getDeleteSnapshotRequest().getBucketName(),
+        req.getDeleteSnapshotRequest().getSnapshotName()));
+    reqLockMap.put(Type.RenameSnapshot, (om, req) -> new 
OmLockOpr(LockType.SNAPSHOT,
+        req.getRenameSnapshotRequest().getVolumeName(), 
req.getRenameSnapshotRequest().getBucketName(),
+        Arrays.asList(req.getRenameSnapshotRequest().getSnapshotOldName(),
+            req.getRenameSnapshotRequest().getSnapshotNewName())));
+    reqLockMap.put(Type.RecoverLease, (om, req) -> new 
OmLockOpr(LockType.W_FSO,
+        req.getRecoverLeaseRequest().getVolumeName(), 
req.getRecoverLeaseRequest().getBucketName(),
+        req.getRecoverLeaseRequest().getKeyName()));
+    reqLockMap.put(Type.CreateDirectory, (om, req) -> getKeyLock(om, 
req.getCreateDirectoryRequest().getKeyArgs()));
+    reqLockMap.put(Type.CreateFile, (om, req) -> getKeyLock(om, 
req.getCreateFileRequest().getKeyArgs()));
+    reqLockMap.put(Type.CreateKey, (om, req) -> getKeyLock(om, 
req.getCreateKeyRequest().getKeyArgs()));
+    reqLockMap.put(Type.AllocateBlock, (om, req) -> getKeyLock(om, 
req.getAllocateBlockRequest().getKeyArgs()));
+    reqLockMap.put(Type.CommitKey, (om, req) -> getKeyLock(om, 
req.getCommitKeyRequest().getKeyArgs()));
+    reqLockMap.put(Type.DeleteKey, (om, req) -> getKeyLock(om, 
req.getDeleteKeyRequest().getKeyArgs()));
+    reqLockMap.put(Type.DeleteKeys, (om, req) -> getDeleteKeyLock(om, 
req.getDeleteKeysRequest().getDeleteKeys()));
+    reqLockMap.put(Type.RenameKey, (om, req) -> getRenameKey(om, 
req.getRenameKeyRequest()));
+    reqLockMap.put(Type.RenameKeys, (om, req) -> {
+      OzoneManagerProtocolProtos.RenameKeysArgs renameKeysArgs = 
req.getRenameKeysRequest().getRenameKeysArgs();
+      List<String> lockKeyList = new ArrayList<>();
+      renameKeysArgs.getRenameKeysMapList().forEach(e -> {
+        lockKeyList.add(e.getFromKeyName());
+        lockKeyList.add(e.getToKeyName());
+      });
+      return new OmLockOpr(LockType.W_OBS, renameKeysArgs.getVolumeName(), 
renameKeysArgs.getBucketName(), lockKeyList);
+    });
+    reqLockMap.put(Type.InitiateMultiPartUpload, (om, req) ->
+        getKeyLock(om, req.getInitiateMultiPartUploadRequest().getKeyArgs()));
+    reqLockMap.put(Type.CommitMultiPartUpload, (om, req) ->
+        getKeyLock(om, req.getCommitMultiPartUploadRequest().getKeyArgs()));
+    reqLockMap.put(Type.AbortMultiPartUpload, (om, req) ->
+        getKeyLock(om, req.getAbortMultiPartUploadRequest().getKeyArgs()));
+    reqLockMap.put(Type.CompleteMultiPartUpload, (om, req) ->
+        getKeyLock(om, req.getCompleteMultiPartUploadRequest().getKeyArgs()));
+    reqLockMap.put(Type.SetTimes, (om, req) -> getKeyLock(om, 
req.getSetTimesRequest().getKeyArgs()));
+  }
+  
+  private static OmLockOpr aclLockInfo(OzoneManager om, 
OzoneManagerProtocolProtos.OzoneObj obj) {
+    try {
+      if (obj.getResType() == 
OzoneManagerProtocolProtos.OzoneObj.ObjectType.VOLUME) {
+        return new OmLockOpr(LockType.R_VOLUME, obj.getPath().substring(1));
+      } else if (obj.getResType() == 
OzoneManagerProtocolProtos.OzoneObj.ObjectType.BUCKET) {
+        ObjectParser objParser = new ObjectParser(obj.getPath(), 
OzoneManagerProtocolProtos.OzoneObj.ObjectType.BUCKET);
+        return new OmLockOpr(LockType.RW_VOLUME_BUCKET, objParser.getVolume(), 
objParser.getBucket());
+      } else if (obj.getResType() == 
OzoneManagerProtocolProtos.OzoneObj.ObjectType.KEY) {
+        ObjectParser objParser = new ObjectParser(obj.getPath(), 
OzoneManagerProtocolProtos.OzoneObj.ObjectType.KEY);
+        String bucketKey = 
om.getMetadataManager().getBucketKey(objParser.getVolume(), 
objParser.getBucket());
+        OmBucketInfo bucketInfo = 
om.getMetadataManager().getBucketTable().get(bucketKey);
+        if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+          return new OmLockOpr(LockType.W_FSO, objParser.getVolume(), 
objParser.getBucket(), objParser.getKey());
+        }
+        if 
(bucketInfo.getBucketLayout().isObjectStore(om.getEnableFileSystemPaths())) {
+          return new OmLockOpr(LockType.W_OBS, objParser.getVolume(), 
objParser.getBucket(), objParser.getKey());
+        }
+        // OBS and LegacyFso will follow same path for lock with full key path
+        return new OmLockOpr(LockType.W_OBS, objParser.getVolume(), 
objParser.getBucket(), objParser.getKey());
+      } else {
+        // prefix type with full path
+        return new OmLockOpr(LockType.WRITE, obj.getPath());
+      }
+    } catch (Exception ex) {
+      return EXP_LOCK;
+    }
+  }
+  
+  private static OmLockOpr getTenantLock(OzoneManager om, LockType type, 
String tenantId) {
+    try {
+      OmDBTenantState omDBTenantState = 
om.getMetadataManager().getTenantStateTable().get(tenantId);
+      return new OmLockOpr(type, omDBTenantState.getBucketNamespaceName());
+    } catch (Exception ex) {
+      return EXP_LOCK;
+    }
+  }
+
+  private static OmLockOpr getKeyLock(OzoneManager om, 
OzoneManagerProtocolProtos.KeyArgs args) {
+    try {
+      String bucketKey = 
om.getMetadataManager().getBucketKey(args.getVolumeName(), 
args.getBucketName());
+      OmBucketInfo bucketInfo = 
om.getMetadataManager().getBucketTable().get(bucketKey);
+      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+        return new OmLockOpr(LockType.W_FSO, args.getVolumeName(), 
args.getBucketName(), args.getKeyName());
+      }
+      if 
(bucketInfo.getBucketLayout().isObjectStore(om.getEnableFileSystemPaths())) {
+        return new OmLockOpr(LockType.W_OBS, args.getVolumeName(), 
args.getBucketName(), args.getKeyName());
+      }
+      // as LegacyFSO, need extract all path to be created and do write lock 
for missing parent
+      return new OmLockOpr(LockType.W_LEGACY_FSO, args.getVolumeName(), 
args.getBucketName(), args.getKeyName());
+    } catch (Exception ex) {
+      return EXP_LOCK;
+    }
+  }
+  private static OmLockOpr getDeleteKeyLock(OzoneManager om, 
OzoneManagerProtocolProtos.DeleteKeyArgs args) {
+    String bucketKey = 
om.getMetadataManager().getBucketKey(args.getVolumeName(), 
args.getBucketName());
+    try {
+      OmBucketInfo bucketInfo = 
om.getMetadataManager().getBucketTable().get(bucketKey);
+      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+        return new OmLockOpr(LockType.W_FSO, args.getVolumeName(), 
args.getBucketName(), args.getKeysList());
+      }
+      return new OmLockOpr(LockType.W_OBS, args.getVolumeName(), 
args.getBucketName(), args.getKeysList());
+    } catch (IOException e) {
+      return EXP_LOCK;
+    }
+  }
+  private static OmLockOpr getRenameKey(OzoneManager om, 
OzoneManagerProtocolProtos.RenameKeyRequest req) {
+    String bucketKey = 
om.getMetadataManager().getBucketKey(req.getKeyArgs().getVolumeName(),
+        req.getKeyArgs().getBucketName());
+    try {
+      OmBucketInfo bucketInfo = 
om.getMetadataManager().getBucketTable().get(bucketKey);
+      if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+        return new OmLockOpr(LockType.W_FSO, req.getKeyArgs().getVolumeName(), 
req.getKeyArgs().getBucketName(),
+            Arrays.asList(req.getKeyArgs().getKeyName(), req.getToKeyName()));
+      }
+      return new OmLockOpr(LockType.W_OBS, req.getKeyArgs().getVolumeName(), 
req.getKeyArgs().getBucketName(),
+          Arrays.asList(req.getKeyArgs().getKeyName(), req.getToKeyName()));
+    } catch (IOException e) {
+      return EXP_LOCK;
+    }
+  }
+  
+  public static OmLockOpr getLockOperation(OzoneManager om, 
OzoneManagerProtocolProtos.OMRequest req) {
+    return reqLockMap.get(req.getCmdType()).apply(om, req);
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java
new file mode 100644
index 0000000000..8a31e22690
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * track changes for bucket quota.
+ */
+public class BucketQuotaResource {
+  private static BucketQuotaResource quotaResource = new BucketQuotaResource();
+  public static BucketQuotaResource instance() {
+    return quotaResource;
+  }
+
+  private AtomicBoolean track = new AtomicBoolean();
+  private Map<Long, BucketQuota> bucketQuotaMap = new ConcurrentHashMap<>();
+
+  public void enableTrack() {
+    track.set(true);
+  }
+  public boolean isEnabledTrack() {
+    return track.get();
+  }
+
+  public BucketQuota get(Long bucketId) {
+    return bucketQuotaMap.computeIfAbsent(bucketId, k -> new BucketQuota());
+  }
+
+  public void remove(Long bucketId) {
+    bucketQuotaMap.remove(bucketId);
+  }
+
+  public boolean exist(Long bucketId) {
+    return bucketQuotaMap.containsKey(bucketId);
+  }
+
+  /**
+   * record bucket changes.
+   */
+  public static class BucketQuota {
+    private AtomicLong incUsedBytes = new AtomicLong();
+    private AtomicLong incUsedNamespace = new AtomicLong();
+
+    public void addUsedBytes(long bytes) {
+      incUsedBytes.addAndGet(bytes);
+    }
+
+    public long getUsedBytes() {
+      return incUsedBytes.get();
+    }
+
+    public void addUsedNamespace(long bytes) {
+      incUsedNamespace.addAndGet(bytes);
+    }
+
+    public long getUsedNamespace() {
+      return incUsedNamespace.get();
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
index bfe51bf4bb..199485d1bd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
@@ -38,7 +38,7 @@ public class FollowerRequestExecutor {
   private final AtomicLong callId = new AtomicLong(0);
   private final OzoneManager ozoneManager;
   private AtomicLong uniqueIndex;
-  private final PoolExecutor<RequestContext> ratisSubmitter;
+  private final PoolExecutor<RequestContext, Void> ratisSubmitter;
   private final OzoneManagerRequestHandler handler;
 
   public FollowerRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
@@ -50,7 +50,7 @@ public class FollowerRequestExecutor {
       this.handler = null;
     }
     ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE, 
RATIS_TASK_QUEUE_SIZE,
-        ozoneManager.getThreadNamePrefix(), this::ratisSubmitCommand, null);
+        ozoneManager.getThreadNamePrefix() + "-FollowerExecutor", 
this::ratisSubmitCommand, null);
   }
   public void stop() {
     ratisSubmitter.stop();
@@ -63,7 +63,7 @@ public class FollowerRequestExecutor {
     ratisSubmitter.submit(idx, ctx);
   }
 
-  private void ratisSubmitCommand(Collection<RequestContext> ctxs, 
PoolExecutor<RequestContext> nxtPool) {
+  private void ratisSubmitCommand(Collection<RequestContext> ctxs, 
PoolExecutor.CheckedConsumer<Void> nxtPool) {
     for (RequestContext ctx : ctxs) {
       sendDbUpdateRequest(ctx);
     }
@@ -71,12 +71,6 @@ public class FollowerRequestExecutor {
 
   private void sendDbUpdateRequest(RequestContext ctx) {
     try {
-      if (!ozoneManager.isRatisEnabled()) {
-        OzoneManagerProtocolProtos.OMResponse response = 
OMBasicStateMachine.runCommand(ctx.getRequest(),
-            TermIndex.valueOf(-1, uniqueIndex.incrementAndGet()), handler, 
ozoneManager);
-        ctx.getFuture().complete(response);
-        return;
-      }
       // TODO hack way of transferring Leader index to follower nodes to use 
this index
       // need check proper way of index
       OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
@@ -84,6 +78,13 @@ public class FollowerRequestExecutor {
       reqBuilder.addIndex(uniqueIndex.incrementAndGet());
       OzoneManagerProtocolProtos.OMRequest req = ctx.getRequest().toBuilder()
           .setPersistDbRequest(reqBuilder.build()).build();
+
+      if (!ozoneManager.isRatisEnabled()) {
+        OzoneManagerProtocolProtos.OMResponse response = 
OMBasicStateMachine.runCommand(req,
+            TermIndex.valueOf(-1, uniqueIndex.incrementAndGet()), handler, 
ozoneManager);
+        ctx.getFuture().complete(response);
+        return;
+      }
       OzoneManagerProtocolProtos.OMResponse response = 
ozoneManager.getOmRatisServer().submitRequest(req,
           ClientId.randomId(), callId.incrementAndGet());
       ctx.getFuture().complete(response);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
index c074918a1f..f213c0a514 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
@@ -26,11 +26,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -54,24 +56,30 @@ public class LeaderRequestExecutor {
   private static final Logger LOG = 
LoggerFactory.getLogger(LeaderRequestExecutor.class);
   private static final int REQUEST_EXECUTOR_POOL_SIZE = 1;
   private static final int REQUEST_EXECUTOR_QUEUE_SIZE = 1000;
-  private static final int RATIS_TASK_POOL_SIZE = 1;
+  private static final int MERGE_TASK_POOL_SIZE = 1;
+  private static final int MERGE_TASK_QUEUE_SIZE = 1000;
+  private static final int RATIS_TASK_POOL_SIZE = 10;
   private static final int RATIS_TASK_QUEUE_SIZE = 1000;
   private static final long DUMMY_TERM = -1;
   private final AtomicLong uniqueIndex;
   private final int ratisByteLimit;
   private final OzoneManager ozoneManager;
-  private final PoolExecutor<RequestContext> ratisSubmitter;
-  private final PoolExecutor<RequestContext> leaderExecutor;
+  private final PoolExecutor<RatisContext, Void> ratisSubmitter;
+  private final PoolExecutor<RequestContext, RatisContext> requestMerger;
+  private final PoolExecutor<RequestContext, RequestContext> leaderExecutor;
   private final OzoneManagerRequestHandler handler;
   private final AtomicBoolean isEnabled = new AtomicBoolean(true);
+  private final AtomicInteger ratisCurrentPool = new AtomicInteger(0);
 
   public LeaderRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
     this.ozoneManager = om;
     this.handler = new OzoneManagerRequestHandler(ozoneManager);
-    ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE,
-        RATIS_TASK_QUEUE_SIZE, ozoneManager.getThreadNamePrefix(), 
this::ratisSubmitCommand, null);
+    ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE, 
RATIS_TASK_QUEUE_SIZE,
+        ozoneManager.getThreadNamePrefix() + "-LeaderRatis", 
this::ratisCommand, null);
+    requestMerger = new PoolExecutor<>(MERGE_TASK_POOL_SIZE, 
MERGE_TASK_QUEUE_SIZE,
+        ozoneManager.getThreadNamePrefix() + "-LeaderMerger", 
this::requestMergeCommand, this::ratisSubmit);
     leaderExecutor = new PoolExecutor<>(REQUEST_EXECUTOR_POOL_SIZE, 
REQUEST_EXECUTOR_QUEUE_SIZE,
-        ozoneManager.getThreadNamePrefix(), this::runExecuteCommand, 
ratisSubmitter);
+        ozoneManager.getThreadNamePrefix() + "-LeaderExecutor", 
this::runExecuteCommand, this::mergeSubmit);
     int limit = (int) ozoneManager.getConfiguration().getStorageSize(
         OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
         OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
@@ -82,6 +90,7 @@ public class LeaderRequestExecutor {
   }
   public void stop() {
     leaderExecutor.stop();
+    requestMerger.stop();
     ratisSubmitter.stop();
   }
   public int batchSize() {
@@ -99,39 +108,45 @@ public class LeaderRequestExecutor {
 
   public void submit(int idx, RequestContext ctx) throws InterruptedException {
     if (!isEnabled.get()) {
-      rejectRequest(ctx);
+      rejectRequest(Collections.singletonList(ctx));
       return;
     }
-    leaderExecutor.submit(idx, ctx);
+    executeRequest(ctx, this::mergeSubmit);
+    //leaderExecutor.submit(idx, ctx);
   }
 
-  private void rejectRequest(RequestContext ctx) {
+  private void rejectRequest(Collection<RequestContext> ctxs) {
+    Throwable th;
     if (!ozoneManager.isLeaderReady()) {
       String peerId = ozoneManager.isRatisEnabled() ? 
ozoneManager.getOmRatisServer().getRaftPeerId().toString()
           : ozoneManager.getOMNodeId();
-      OMLeaderNotReadyException leaderNotReadyException = new 
OMLeaderNotReadyException(peerId
-          + " is not ready to process request yet.");
-      ctx.getFuture().completeExceptionally(leaderNotReadyException);
+      th = new OMLeaderNotReadyException(peerId + " is not ready to process 
request yet.");
     } else {
-      ctx.getFuture().completeExceptionally(new OMException("Request 
processing is disabled due to error",
-          OMException.ResultCodes.INTERNAL_ERROR));
+      th = new OMException("Request processing is disabled due to error", 
OMException.ResultCodes.INTERNAL_ERROR);
     }
-  }
-  private void rejectRequest(Collection<RequestContext> ctxs) {
-    ctxs.forEach(ctx -> rejectRequest(ctx));
+    handleBatchUpdateComplete(ctxs, th, null);
   }
 
-  private void runExecuteCommand(Collection<RequestContext> ctxs, 
PoolExecutor<RequestContext> nxtPool) {
+  private void runExecuteCommand(
+      Collection<RequestContext> ctxs, 
PoolExecutor.CheckedConsumer<RequestContext> nxtPool) {
+    if (!isEnabled.get()) {
+      rejectRequest(ctxs);
+      return;
+    }
     for (RequestContext ctx : ctxs) {
       if (!isEnabled.get()) {
-        rejectRequest(ctx);
-        return;
+        rejectRequest(Collections.singletonList(ctx));
+        continue;
       }
       executeRequest(ctx, nxtPool);
     }
   }
 
-  private void executeRequest(RequestContext ctx, PoolExecutor<RequestContext> 
nxtPool) {
+  private void mergeSubmit(RequestContext ctx) throws InterruptedException {
+    requestMerger.submit(0, ctx);
+  }
+
+  private void executeRequest(RequestContext ctx, 
PoolExecutor.CheckedConsumer<RequestContext> nxtPool) {
     OMRequest request = ctx.getRequest();
     TermIndex termIndex = TermIndex.valueOf(DUMMY_TERM, 
uniqueIndex.incrementAndGet());
     ctx.setIndex(termIndex);
@@ -146,7 +161,7 @@ public class LeaderRequestExecutor {
     } finally {
       if (ctx.getNextRequest() != null) {
         try {
-          nxtPool.submit(0, ctx);
+          nxtPool.accept(ctx);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
@@ -164,7 +179,8 @@ public class LeaderRequestExecutor {
       if (!omClientResponse.getOMResponse().getSuccess()) {
         OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex);
       } else {
-        OzoneManagerProtocolProtos.PersistDbRequest.Builder nxtRequest = 
retrieveDbChanges(termIndex, omClientResponse);
+        OzoneManagerProtocolProtos.PersistDbRequest.Builder nxtRequest
+            = retrieveDbChanges(ctx, termIndex, omClientResponse);
         if (nxtRequest != null) {
           OMRequest.Builder omReqBuilder = 
OMRequest.newBuilder().setPersistDbRequest(nxtRequest.build())
               .setCmdType(OzoneManagerProtocolProtos.Type.PersistDb);
@@ -181,16 +197,25 @@ public class LeaderRequestExecutor {
   }
 
   private OzoneManagerProtocolProtos.PersistDbRequest.Builder 
retrieveDbChanges(
-      TermIndex termIndex, OMClientResponse omClientResponse) throws 
IOException {
-    try (BatchOperation batchOperation = 
ozoneManager.getMetadataManager().getStore()
+      RequestContext ctx, TermIndex termIndex, OMClientResponse 
omClientResponse) throws IOException {
+    OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    String name = metadataManager.getBucketTable().getName();
+    boolean isDbChanged = false;
+    try (BatchOperation batchOperation = metadataManager.getStore()
         .initBatchOperation()) {
-      omClientResponse.checkAndUpdateDB(ozoneManager.getMetadataManager(), 
batchOperation);
-      // get db update and raise request to flush
+      omClientResponse.checkAndUpdateDB(metadataManager, batchOperation);
+      // get db update and create request to flush
       OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
           = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
       Map<String, Map<ByteBuffer, ByteBuffer>> cachedDbTxs
           = ((RDBBatchOperation) batchOperation).getCachedTransaction();
       for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> tblEntry : 
cachedDbTxs.entrySet()) {
+        isDbChanged = true;
+        if (tblEntry.getKey().equals(name)) {
+          if (ctx.getClientRequest().getWrappedBucketInfo() instanceof 
OmBucketInfoQuotaTracker) {
+            continue;
+          }
+        }
         OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder
             = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder();
         tblBuilder.setTableName(tblEntry.getKey());
@@ -205,7 +230,7 @@ public class LeaderRequestExecutor {
         }
         reqBuilder.addTableUpdates(tblBuilder.build());
       }
-      if (reqBuilder.getTableUpdatesCount() == 0) {
+      if (!isDbChanged) {
         return null;
       }
       reqBuilder.addIndex(termIndex.getIndex());
@@ -213,11 +238,13 @@ public class LeaderRequestExecutor {
     }
   }
 
-  private void ratisSubmitCommand(Collection<RequestContext> ctxs, 
PoolExecutor<RequestContext> nxtPool) {
+  private void requestMergeCommand(
+      Collection<RequestContext> ctxs, 
PoolExecutor.CheckedConsumer<RatisContext> nxtPool) {
     if (!isEnabled.get()) {
       rejectRequest(ctxs);
       return;
     }
+    Map<Long, OzoneManagerProtocolProtos.BucketQuotaCount.Builder> 
bucketChangeMap = new HashMap<>();
     List<RequestContext> sendList = new ArrayList<>();
     OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
         = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
@@ -230,16 +257,19 @@ public class LeaderRequestExecutor {
       }
       if ((tmpSize + size) > ratisByteLimit) {
         // send current batched request
-        prepareAndSendRequest(sendList, reqBuilder);
+        appendBucketQuotaChanges(reqBuilder, bucketChangeMap);
+        prepareAndSendRequest(sendList, reqBuilder, nxtPool);
 
         // reinit and continue
         reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
         size = 0;
         sendList.clear();
+        bucketChangeMap.clear();
       }
 
       // keep adding to batch list
       size += tmpSize;
+      addBucketQuotaChanges(ctx, bucketChangeMap);
       for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) {
         OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder
             = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder();
@@ -251,30 +281,81 @@ public class LeaderRequestExecutor {
       sendList.add(ctx);
     }
     if (sendList.size() > 0) {
-      prepareAndSendRequest(sendList, reqBuilder);
+      appendBucketQuotaChanges(reqBuilder, bucketChangeMap);
+      prepareAndSendRequest(sendList, reqBuilder, nxtPool);
+    }
+  }
+
+  private void ratisSubmit(RatisContext ctx) throws InterruptedException {
+    // follow simple strategy to submit to ratis for next set of merge request
+    int nxtIndex = Math.abs(ratisCurrentPool.getAndIncrement() % 
RATIS_TASK_POOL_SIZE);
+    ratisSubmitter.submit(nxtIndex, ctx);
+  }
+
+  private void addBucketQuotaChanges(
+      RequestContext ctx, Map<Long, 
OzoneManagerProtocolProtos.BucketQuotaCount.Builder> quotaMap) {
+    if (ctx.getClientRequest().getWrappedBucketInfo() instanceof 
OmBucketInfoQuotaTracker) {
+      OmBucketInfoQuotaTracker info = (OmBucketInfoQuotaTracker) 
ctx.getClientRequest().getWrappedBucketInfo();
+      OzoneManagerProtocolProtos.BucketQuotaCount.Builder quotaBuilder = 
quotaMap.computeIfAbsent(
+          info.getObjectID(), k -> 
OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder()
+              
.setVolName(info.getVolumeName()).setBucketName(info.getBucketName())
+              
.setBucketObjectId(info.getObjectID()).setSupportOldQuota(false));
+      quotaBuilder.setDiffUsedBytes(quotaBuilder.getDiffUsedBytes() + 
info.getIncUsedBytes());
+      quotaBuilder.setDiffUsedNamespace(quotaBuilder.getDiffUsedNamespace() + 
info.getIncUsedNamespace());
+    }
+  }
+
+  private void appendBucketQuotaChanges(
+      OzoneManagerProtocolProtos.PersistDbRequest.Builder req,
+      Map<Long, OzoneManagerProtocolProtos.BucketQuotaCount.Builder> quotaMap) 
{
+    for (Map.Entry<Long, OzoneManagerProtocolProtos.BucketQuotaCount.Builder> 
entry : quotaMap.entrySet()) {
+      if (entry.getValue().getDiffUsedBytes() == 0 && 
entry.getValue().getDiffUsedNamespace() == 0) {
+        continue;
+      }
+      req.addBucketQuotaCount(entry.getValue().build());
     }
   }
 
   private void prepareAndSendRequest(
-      List<RequestContext> sendList, 
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder) {
+      List<RequestContext> sendList, 
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder,
+      PoolExecutor.CheckedConsumer<RatisContext> nxtPool) {
     RequestContext lastReqCtx = sendList.get(sendList.size() - 1);
     OMRequest.Builder omReqBuilder = 
OMRequest.newBuilder().setPersistDbRequest(reqBuilder.build())
         .setCmdType(OzoneManagerProtocolProtos.Type.PersistDb)
         .setClientId(lastReqCtx.getRequest().getClientId());
+    OMRequest reqBatch = omReqBuilder.build();
     try {
-      OMRequest reqBatch = omReqBuilder.build();
-      OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch, 
lastReqCtx.getIndex());
-      if (!dbUpdateRsp.getSuccess()) {
-        throw new OMException(dbUpdateRsp.getMessage(),
-            
OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]);
-      }
-      handleBatchUpdateComplete(sendList, null, 
dbUpdateRsp.getLeaderOMNodeId());
-    } catch (Throwable e) {
-      LOG.warn("Failed to write, Exception occurred ", e);
+      nxtPool.accept(new RatisContext(sendList, reqBatch));
+    } catch (InterruptedException e) {
       handleBatchUpdateComplete(sendList, e, null);
+      Thread.currentThread().interrupt();
     }
   }
 
+  private void ratisCommand(Collection<RatisContext> ctxs, 
PoolExecutor.CheckedConsumer<Void> nxtPool) {
+    if (!isEnabled.get()) {
+      for (RatisContext ctx : ctxs) {
+        rejectRequest(ctx.getRequestContexts());
+      }
+      return;
+    }
+    for (RatisContext ctx : ctxs) {
+      List<RequestContext> sendList = ctx.getRequestContexts();
+      RequestContext lastReqCtx = sendList.get(sendList.size() - 1);
+      OMRequest reqBatch = ctx.getRequest();
+      try {
+        OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch, 
lastReqCtx.getIndex());
+        if (!dbUpdateRsp.getSuccess()) {
+          throw new OMException(dbUpdateRsp.getMessage(),
+              
OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]);
+        }
+        handleBatchUpdateComplete(sendList, null, 
dbUpdateRsp.getLeaderOMNodeId());
+      } catch (Throwable e) {
+        LOG.warn("Failed to write, Exception occurred ", e);
+        handleBatchUpdateComplete(sendList, e, null);
+      }
+    }
+  }
   private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex 
termIndex) throws Exception {
     try {
       if (!ozoneManager.isRatisEnabled()) {
@@ -300,8 +381,29 @@ public class LeaderRequestExecutor {
     return omResponse;
   }
   private void handleBatchUpdateComplete(Collection<RequestContext> ctxs, 
Throwable th, String leaderOMNodeId) {
+    // TODO: no-cache switch, no need cleanup cache
     Map<String, List<Long>> cleanupMap = new HashMap<>();
     for (RequestContext ctx : ctxs) {
+      // cache cleanup
+      if (null != ctx.getNextRequest()) {
+        List<OzoneManagerProtocolProtos.DBTableUpdate> tblList = 
ctx.getNextRequest().getTableUpdatesList();
+        for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) {
+          List<Long> epochs = 
cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>());
+          epochs.add(ctx.getIndex().getIndex());
+        }
+      }
+      for (Map.Entry<String, List<Long>> entry : cleanupMap.entrySet()) {
+        
ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue());
+      }
+    }
+
+    for (RequestContext ctx : ctxs) {
+      if (ctx.getClientRequest().getWrappedBucketInfo() instanceof 
OmBucketInfoQuotaTracker) {
+        // reset to be done to update resource quota for both success and 
failure
+        // for success also, its added here as it's difficult to ensure its 
execution at nodes
+        ((OmBucketInfoQuotaTracker) 
ctx.getClientRequest().getWrappedBucketInfo()).reset();
+      }
+
       if (th != null) {
         OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), 
ctx.getClientRequest(), ozoneManager,
             ctx.getIndex(), th);
@@ -310,9 +412,6 @@ public class LeaderRequestExecutor {
         } else {
           ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new 
IOException(th)));
         }
-
-        // TODO: no-cache, remove disable processing, let every request deal 
with ratis failure
-        disableProcessing();
       } else {
         OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), 
ctx.getIndex());
         OMResponse newRsp = ctx.getResponse();
@@ -321,19 +420,22 @@ public class LeaderRequestExecutor {
         }
         ctx.getFuture().complete(newRsp);
       }
+    }
+  }
 
-      // cache cleanup
-      if (null != ctx.getNextRequest()) {
-        List<OzoneManagerProtocolProtos.DBTableUpdate> tblList = 
ctx.getNextRequest().getTableUpdatesList();
-        for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) {
-          List<Long> epochs = 
cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>());
-          epochs.add(ctx.getIndex().getIndex());
-        }
-      }
+  static class RatisContext {
+    private List<RequestContext> ctxs;
+    private OMRequest req;
+    RatisContext(List<RequestContext> ctxs, OMRequest req) {
+      this.ctxs = ctxs;
+      this.req = req;
+    }
+    public List<RequestContext> getRequestContexts() {
+      return ctxs;
     }
-    // TODO: no-cache, no need cleanup cache
-    for (Map.Entry<String, List<Long>> entry : cleanupMap.entrySet()) {
-      
ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue());
+
+    public OMRequest getRequest() {
+      return req;
     }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
index fd2962ac5b..6b9cb06836 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
@@ -154,7 +154,10 @@ public class OMBasicStateMachine extends BaseStateMachine {
 
   @Override
   protected synchronized boolean updateLastAppliedTermIndex(TermIndex 
newTermIndex) {
-    return super.updateLastAppliedTermIndex(newTermIndex);
+    if (getLastAppliedTermIndex().getIndex() < newTermIndex.getIndex()) {
+      return super.updateLastAppliedTermIndex(newTermIndex);
+    }
+    return true;
   }
 
   /**
@@ -239,6 +242,14 @@ public class OMBasicStateMachine extends BaseStateMachine {
       final OMRequest request = context != null ? (OMRequest) context
           : OMRatisHelper.convertByteStringToOMRequest(
           trx.getStateMachineLogEntry().getLogData());
+      // Prepare have special handling for execution, as snapshot is taken 
during this operation
+      // and its required that applyTransaction should be finished for this to 
handle
+      // And from leader, its guranteed that no other execution is allowed
+      if (request.getCmdType() == OzoneManagerProtocolProtos.Type.Prepare) {
+        return CompletableFuture.supplyAsync(() ->
+            OMRatisHelper.convertResponseToMessage(runCommand(request, 
termIndex, handler, ozoneManager)),
+            installSnapshotExecutor);
+      }
       OMResponse response = runCommand(request, termIndex, handler, 
ozoneManager);
       CompletableFuture<Message> future = new CompletableFuture<>();
       future.complete(OMRatisHelper.convertResponseToMessage(response));
@@ -377,11 +388,12 @@ public class OMBasicStateMachine extends BaseStateMachine 
{
         index = transactionInfo.getIndex();
       }
       try {
+        // leader index shared to follower for follower execution
         if (request.hasPersistDbRequest() && 
request.getPersistDbRequest().getIndexCount() > 0) {
           index = 
Math.max(Collections.max(request.getPersistDbRequest().getIndexList()).longValue(),
 index);
         }
+        // TODO temp way to pass index used for object ID creation for certain 
follower execution
         TermIndex objectIndex = termIndex;
-        // TODO temp fix for index sharing from leader to follower in follower 
execution
         if (request.getCmdType() != OzoneManagerProtocolProtos.Type.PersistDb
             && request.getCmdType() != 
OzoneManagerProtocolProtos.Type.Prepare) {
           objectIndex = TermIndex.valueOf(termIndex.getTerm(), index);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
index 8942d04439..c2c9333f6d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
@@ -18,26 +18,20 @@ package org.apache.hadoop.ozone.om.ratis.execution;
 
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.TypedTable;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProcessingDetails;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.lock.OmLockOpr;
+import org.apache.hadoop.ozone.om.lock.OmRequestLockUtils;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -46,7 +40,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +52,7 @@ public class OMGateway {
   private final FollowerRequestExecutor followerExecutor;
   private final OzoneManager om;
   private final AtomicLong requestInProgress = new AtomicLong(0);
+  private final AtomicInteger leaderExecCurrIdx = new AtomicInteger(0);
   /**
    * uniqueIndex is used to generate index used in objectId creation uniquely 
accross OM nodes.
    * This makes use of termIndex for init shifted within 54 bits.
@@ -67,6 +61,8 @@ public class OMGateway {
 
   public OMGateway(OzoneManager om) throws IOException {
     this.om = om;
+    OmLockOpr.init(om.getThreadNamePrefix());
+    OmRequestLockUtils.init();
     this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex);
     this.followerExecutor = new FollowerRequestExecutor(om, uniqueIndex);
     if (om.isLeaderExecutorEnabled() && om.isRatisEnabled()) {
@@ -84,17 +80,22 @@ public class OMGateway {
       // for non-ratis flow, init with last index
       uniqueIndex.set(om.getLastTrxnIndexForNonRatis());
     }
+    if (om.isLeaderExecutorEnabled()) {
+      BucketQuotaResource.instance().enableTrack();
+    }
   }
   public void stop() {
     leaderExecutor.stop();
     followerExecutor.stop();
+    OmLockOpr.stop();
   }
   public OMResponse submit(OMRequest omRequest) throws ServiceException {
     if (!om.isLeaderReady()) {
-      String peerId = om.isRatisEnabled() ? 
om.getOmRatisServer().getRaftPeerId().toString() : om.getOMNodeId();
-      OMLeaderNotReadyException leaderNotReadyException = new 
OMLeaderNotReadyException(peerId
-          + " is not ready to process request yet.");
-      throw new ServiceException(leaderNotReadyException);
+      try {
+        om.checkLeaderStatus();
+      } catch (IOException e) {
+        throw new ServiceException(e);
+      }
     }
     executorEnable();
     RequestContext requestContext = new RequestContext();
@@ -103,40 +104,73 @@ public class OMGateway {
     requestContext.setFuture(new CompletableFuture<>());
     CompletableFuture<OMResponse> f = requestContext.getFuture()
         .whenComplete((r, th) -> handleAfterExecution(requestContext, th));
+    OmLockOpr lockOperation = OmRequestLockUtils.getLockOperation(om, 
omRequest);
     try {
-      // TODO gateway locking: take lock with OMLockDetails
       // TODO scheduling of request to pool
-      om.checkLeaderStatus();
-      validate(omRequest);
       OMClientRequest omClientRequest = 
OzoneManagerRatisUtils.createClientRequest(omRequest, om);
+      lockOperation.lock(om);
       requestContext.setClientRequest(omClientRequest);
 
+      validate(omRequest);
+      ensurePreviousRequestCompletionForPrepare(omRequest);
+
       // submit request
       ExecutorType executorType = executorSelector(omRequest);
       if (executorType == ExecutorType.LEADER_COMPATIBLE) {
-        leaderExecutor.submit(0, requestContext);
+        int idx = Math.abs(leaderExecCurrIdx.getAndIncrement() % 
leaderExecutor.batchSize());
+        leaderExecutor.submit(idx, requestContext);
       } else if (executorType == ExecutorType.FOLLOWER) {
         followerExecutor.submit(0, requestContext);
       } else {
         leaderExecutor.submit(0, requestContext);
       }
+
+      try {
+        return f.get();
+      } catch (ExecutionException ex) {
+        if (ex.getCause() != null) {
+          throw new ServiceException(ex.getMessage(), ex.getCause());
+        } else {
+          throw new ServiceException(ex.getMessage(), ex);
+        }
+      }
     } catch (InterruptedException e) {
-      requestContext.getFuture().completeExceptionally(e);
+      LOG.error("Interrupted while handling request", e);
       Thread.currentThread().interrupt();
+      throw new ServiceException(e.getMessage(), e);
+    } catch (ServiceException e) {
+      throw e;
     } catch (Throwable e) {
-      requestContext.getFuture().completeExceptionally(e);
+      LOG.error("Exception occurred while handling request", e);
+      throw new ServiceException(e.getMessage(), e);
+    } finally {
+      lockOperation.unlock();
+      Server.Call call = Server.getCurCall().get();
+      if (null != call) {
+        OMLockDetails lockDetails = lockOperation.getLockDetails();
+        call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKWAIT,
+            lockDetails.getWaitLockNanos(), TimeUnit.NANOSECONDS);
+        call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKSHARED,
+            lockDetails.getReadLockNanos(), TimeUnit.NANOSECONDS);
+        call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKEXCLUSIVE,
+            lockDetails.getWriteLockNanos(), TimeUnit.NANOSECONDS);
+      }
     }
-    try {
-      return f.get();
-    } catch (ExecutionException ex) {
-      throw new ServiceException(ex.getMessage(), ex);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new ServiceException(ex.getMessage(), ex);
+  }
+
+  private void ensurePreviousRequestCompletionForPrepare(OMRequest omRequest) 
throws InterruptedException {
+    // if a prepare request, other request will be discarded before calling 
this
+    if (omRequest.getCmdType() == OzoneManagerProtocolProtos.Type.Prepare) {
+      for (int cnt = 0; cnt < 60 && requestInProgress.get() > 1; ++cnt) {
+        Thread.sleep(1000);
+      }
+      if (requestInProgress.get() > 1) {
+        LOG.warn("Still few requests {} are in progress, continuing with 
prepare", (requestInProgress.get() - 1));
+      }
     }
   }
 
-  private void validate(OMRequest omRequest) throws IOException {
+  private synchronized void validate(OMRequest omRequest) throws IOException {
     OzoneManagerRequestHandler.requestParamValidation(omRequest);
     // validate prepare state
     OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
@@ -165,14 +199,12 @@ public class OMGateway {
     }
   }
   private void handleAfterExecution(RequestContext ctx, Throwable th) {
-    // TODO: gateway locking: release lock and OMLockDetails update
     requestInProgress.decrementAndGet();
   }
 
   public void leaderChangeNotifier(String newLeaderId) {
     boolean isLeader = om.getOMNodeId().equals(newLeaderId);
     if (isLeader) {
-      cleanupCache();
       resetUniqueIndex();
     } else {
       leaderExecutor.disableProcessing();
@@ -194,77 +226,11 @@ public class OMGateway {
     }
   }
 
-  private void rebuildBucketVolumeCache() throws IOException {
-    LOG.info("Rebuild of bucket and volume cache");
-    Table<String, OmBucketInfo> bucketTable = 
om.getMetadataManager().getBucketTable();
-    Set<String> cachedBucketKeySet = new HashSet<>();
-    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> cacheItr = 
bucketTable.cacheIterator();
-    while (cacheItr.hasNext()) {
-      cachedBucketKeySet.add(cacheItr.next().getKey().getCacheKey());
-    }
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>> 
bucItr = bucketTable.iterator()) {
-      while (bucItr.hasNext()) {
-        Table.KeyValue<String, OmBucketInfo> next = bucItr.next();
-        bucketTable.addCacheEntry(next.getKey(), next.getValue(), -1);
-        cachedBucketKeySet.remove(next.getKey());
-      }
-    }
-
-    // removing extra cache entry
-    for (String key : cachedBucketKeySet) {
-      bucketTable.addCacheEntry(key, -1);
-    }
-
-    Set<String> cachedVolumeKeySet = new HashSet<>();
-    Table<String, OmVolumeArgs> volumeTable = 
om.getMetadataManager().getVolumeTable();
-    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmVolumeArgs>>> 
volCacheItr = volumeTable.cacheIterator();
-    while (volCacheItr.hasNext()) {
-      cachedVolumeKeySet.add(volCacheItr.next().getKey().getCacheKey());
-    }
-    try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>> 
volItr = volumeTable.iterator()) {
-      while (volItr.hasNext()) {
-        Table.KeyValue<String, OmVolumeArgs> next = volItr.next();
-        volumeTable.addCacheEntry(next.getKey(), next.getValue(), -1);
-        cachedVolumeKeySet.remove(next.getKey());
-      }
-    }
-
-    // removing extra cache entry
-    for (String key : cachedVolumeKeySet) {
-      volumeTable.addCacheEntry(key, -1);
-    }
-  }
-
-  public void cleanupCache() {
-    // TODO no-cache case, no need re-build bucket/volume cache and cleanup of 
cache
-    LOG.debug("clean all table cache and update bucket/volume with db");
-    for (String tbl : om.getMetadataManager().listTableNames()) {
-      Table table = om.getMetadataManager().getTable(tbl);
-      if (table instanceof TypedTable) {
-        ArrayList<Long> epochs = new ArrayList<>(((TypedTable<?, ?>) 
table).getCache().getEpochEntries().keySet());
-        if (!epochs.isEmpty()) {
-          table.cleanupCache(epochs);
-        }
-      }
-    }
-    try {
-      rebuildBucketVolumeCache();
-    } catch (IOException e) {
-      // retry once, else om down
-      try {
-        rebuildBucketVolumeCache();
-      } catch (IOException ex) {
-        String errorMessage = "OM unable to access rocksdb, terminating OM. 
Error " + ex.getMessage();
-        ExitUtils.terminate(1, errorMessage, ex, LOG);
-      }
-    }
-  }
   public void executorEnable() throws ServiceException {
     if (leaderExecutor.isProcessing()) {
       return;
     }
     if (requestInProgress.get() == 0) {
-      cleanupCache();
       leaderExecutor.enableProcessing();
     } else {
       LOG.warn("Executor is not enabled, previous request {} is still not 
cleaned", requestInProgress.get());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OmBucketInfoQuotaTracker.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OmBucketInfoQuotaTracker.java
new file mode 100644
index 0000000000..62d5e8098d
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OmBucketInfoQuotaTracker.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+
+/**
+ * A class that encapsulates Bucket Info.
+ */
+public final class OmBucketInfoQuotaTracker extends OmBucketInfo {
+  private BucketQuotaResource.BucketQuota resource;
+  private long incUsedBytes;
+  private long incUsedNamespace;
+
+  public static OmBucketInfoQuotaTracker convert(OmBucketInfo info) {
+    Builder builder = info.toBuilder();
+    return createObject(info, builder);
+  }
+
+  private OmBucketInfoQuotaTracker(Builder b) {
+    super(b);
+    resource = BucketQuotaResource.instance().get(b.getObjectID());
+  }
+
+  @Override
+  public void incrUsedBytes(long bytes) {
+    incUsedBytes += bytes;
+    resource.addUsedBytes(bytes);
+  }
+
+  @Override
+  public void incrUsedNamespace(long namespaceToUse) {
+    incUsedNamespace += namespaceToUse;
+    resource.addUsedNamespace(namespaceToUse);
+  }
+
+  @Override
+  public long getUsedBytes() {
+    return resource.getUsedBytes() + super.getUsedBytes();
+  }
+
+  @Override
+  public long getUsedNamespace() {
+    return resource.getUsedNamespace() + super.getUsedNamespace();
+  }
+
+  public long getIncUsedBytes() {
+    return incUsedBytes;
+  }
+  
+  public long getIncUsedNamespace() {
+    return incUsedNamespace;
+  }
+
+  /**
+   * reset resource used bytes as bucket info in db and cache is updated on 
flush.
+   */
+  public void reset() {
+    resource.addUsedBytes(-incUsedBytes);
+    resource.addUsedNamespace(-incUsedNamespace);
+  }
+
+  @Override
+  public OmBucketInfoQuotaTracker copyObject() {
+    Builder builder = toBuilder();
+    return createObject(this, builder);
+  }
+
+  private static OmBucketInfoQuotaTracker createObject(OmBucketInfo info, 
Builder builder) {
+    if (info.getEncryptionKeyInfo() != null) {
+      builder.setBucketEncryptionKey(info.getEncryptionKeyInfo().copy());
+    }
+
+    if (info.getDefaultReplicationConfig() != null) {
+      
builder.setDefaultReplicationConfig(info.getDefaultReplicationConfig().copy());
+    }
+
+    Preconditions.checkNotNull(info.getVolumeName());
+    Preconditions.checkNotNull(info.getBucketName());
+    Preconditions.checkNotNull(info.getAcls());
+    Preconditions.checkNotNull(info.getStorageType());
+    return new OmBucketInfoQuotaTracker(builder);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
index fe9ae728fa..45a461b849 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
@@ -28,29 +28,29 @@ import java.util.function.BiConsumer;
 /**
  * Pool executor.
  */
-public class PoolExecutor <T> {
+public class PoolExecutor <T, Q> {
   private Thread[] threadPool;
   private List<BlockingQueue<T>> queues;
-  private BiConsumer<Collection<T>, PoolExecutor<T>> handler = null;
-  private PoolExecutor<T> nxtPool;
+  private BiConsumer<Collection<T>, CheckedConsumer<Q>> handler = null;
+  private CheckedConsumer<Q> submitter;
   private AtomicBoolean isRunning = new AtomicBoolean(true);
 
   private PoolExecutor(int poolSize, int queueSize, String threadPrefix) {
     threadPool = new Thread[poolSize];
     queues = new ArrayList<>(poolSize);
     for (int i = 0; i < poolSize; ++i) {
-      LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(1000);
+      LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(queueSize);
       queues.add(queue);
-      threadPool[i] = new Thread(() -> execute(queue), threadPrefix + 
"OMExecutor-" + i);
+      threadPool[i] = new Thread(() -> execute(queue), threadPrefix + "-" + i);
       threadPool[i].start();
     }
   }
   public PoolExecutor(
-      int poolSize, int queueSize, String threadPrefix, 
BiConsumer<Collection<T>, PoolExecutor<T>> handler,
-      PoolExecutor<T> nxtPool) {
+      int poolSize, int queueSize, String threadPrefix, 
BiConsumer<Collection<T>, CheckedConsumer<Q>> handler,
+      CheckedConsumer<Q> submitter) {
     this(poolSize, queueSize, threadPrefix);
     this.handler = handler;
-    this.nxtPool = nxtPool;
+    this.submitter = submitter;
   }
   public void submit(int idx, T task) throws InterruptedException {
     if (idx < 0 || idx >= threadPool.length) {
@@ -66,7 +66,7 @@ public class PoolExecutor <T> {
         T task = q.take();
         entries.add(task);
         q.drainTo(entries);
-        handler.accept(entries, nxtPool);
+        handler.accept(entries, submitter);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         break;
@@ -84,4 +84,12 @@ public class PoolExecutor <T> {
       }
     }
   }
+
+  /**
+   * checked functional interface.
+   */
+  @FunctionalInterface
+  public interface CheckedConsumer<S> {
+    void accept(S s) throws InterruptedException;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 663f350718..c3dbed9e67 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -514,12 +514,11 @@ public final class OzoneManagerRatisUtils {
 
   public static OzoneManagerProtocolProtos.OMResponse submitRequest(
       OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) 
throws ServiceException {
+    if (om.isLeaderExecutorEnabled()) {
+      return om.getOMGateway().submit(omRequest);
+    }
     if (om.isRatisEnabled()) {
-      if (om.isLeaderExecutorEnabled()) {
-        return om.getOMGateway().submit(omRequest);
-      } else {
-        return om.getOmRatisServer().submitRequest(omRequest, clientId, 
callId);
-      }
+      return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
     } else {
       return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, 
omRequest);
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 17f9663ae1..05e1066271 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OmUtils;
@@ -586,4 +587,7 @@ public abstract class OMClientRequest implements 
RequestAuditor {
   public void mergeOmLockDetails(OMLockDetails details) {
     omLockDetails.merge(details);
   }
+  public OmBucketInfo getWrappedBucketInfo() {
+    return null;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
index 29e57ae916..697c86cf21 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.ozone.audit.OMSystemAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.execution.OmBucketInfoQuotaTracker;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -41,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
  * Handle OMQuotaRepairRequest Request.
@@ -66,15 +70,25 @@ public class OMPersistDbRequest extends OMClientRequest {
   @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, 
TermIndex termIndex) {
     OzoneManagerProtocolProtos.OMResponse.Builder omResponse = 
OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    Table<String, OmBucketInfo> bucketTable = metadataManager.getBucketTable();
+    Table<String, OmVolumeArgs> volumeTable = metadataManager.getVolumeTable();
     OzoneManagerProtocolProtos.PersistDbRequest dbUpdateRequest = 
getOmRequest().getPersistDbRequest();
+    List<OzoneManagerProtocolProtos.DBTableUpdate> tableUpdatesList = 
dbUpdateRequest.getTableUpdatesList();
 
-    OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
     try (BatchOperation batchOperation = metadataManager.getStore()
         .initBatchOperation()) {
-      List<OzoneManagerProtocolProtos.DBTableUpdate> tableUpdatesList = 
dbUpdateRequest.getTableUpdatesList();
       for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : 
tableUpdatesList) {
         Table table = metadataManager.getTable(tblUpdates.getTableName());
         List<OzoneManagerProtocolProtos.DBTableRecord> recordsList = 
tblUpdates.getRecordsList();
+        if (bucketTable.getName().equals(tblUpdates.getTableName())) {
+          updateBucketRecord(bucketTable, batchOperation, recordsList);
+          continue;
+        }
+        if (volumeTable.getName().equals(tblUpdates.getTableName())) {
+          updateVolumeRecord(volumeTable, batchOperation, recordsList);
+          continue;
+        }
         for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
           if (record.hasValue()) {
             // put
@@ -86,6 +100,28 @@ public class OMPersistDbRequest extends OMClientRequest {
           }
         }
       }
+      for (OzoneManagerProtocolProtos.BucketQuotaCount quota : 
dbUpdateRequest.getBucketQuotaCountList()) {
+        String bucketKey = metadataManager.getBucketKey(quota.getVolName(), 
quota.getBucketName());
+        // TODO remove bucket lock
+        metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, 
quota.getVolName(), quota.getBucketName());
+        try {
+          OmBucketInfo bucketInfo = bucketTable.get(bucketKey);
+          if (bucketInfo instanceof OmBucketInfoQuotaTracker) {
+            bucketInfo = bucketTable.getSkipCache(bucketKey);
+          }
+          if (null == bucketInfo || bucketInfo.getObjectID() != 
quota.getBucketObjectId()) {
+            continue;
+          }
+          bucketInfo.incrUsedBytes(quota.getDiffUsedBytes());
+          bucketInfo.incrUsedNamespace(quota.getDiffUsedNamespace());
+          bucketTable.putWithBatch(batchOperation, bucketKey, bucketInfo);
+          bucketTable.addCacheEntry(bucketKey, bucketInfo, -1);
+          LOG.debug("Updated bucket quota {}-{} for key {}", 
quota.getDiffUsedBytes(), quota.getDiffUsedNamespace(),
+              quota.getBucketName());
+        } finally {
+          metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, 
quota.getVolName(), quota.getBucketName());
+        }
+      }
       long txIndex = 0;
       TransactionInfo transactionInfo = 
TransactionInfo.readTransactionInfo(metadataManager);
       if (transactionInfo != null && transactionInfo.getIndex() != null) {
@@ -96,17 +132,59 @@ public class OMPersistDbRequest extends OMClientRequest {
           batchOperation, TRANSACTION_INFO_KEY, 
TransactionInfo.valueOf(termIndex, txIndex));
       metadataManager.getStore().commitBatchOperation(batchOperation);
       
omResponse.setPersistDbResponse(OzoneManagerProtocolProtos.PersistDbResponse.newBuilder().build());
-      refreshCache(ozoneManager, tableUpdatesList);
     } catch (IOException ex) {
       audit(ozoneManager, dbUpdateRequest, termIndex, ex);
       LOG.error("Db persist exception", ex);
       return new DummyOMClientResponse(createErrorOMResponse(omResponse, ex));
+    } finally {
+      bucketTable.cleanupCache(Collections.singletonList(Long.MAX_VALUE));
+      volumeTable.cleanupCache(Collections.singletonList(Long.MAX_VALUE));
     }
     audit(ozoneManager, dbUpdateRequest, termIndex, null);
     OMClientResponse omClientResponse = new 
DummyOMClientResponse(omResponse.build());
     return omClientResponse;
   }
 
+  private static void updateBucketRecord(
+      Table<String, OmBucketInfo> bucketTable, BatchOperation batchOperation,
+      List<OzoneManagerProtocolProtos.DBTableRecord> recordsList) throws 
IOException {
+    for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
+      String key = record.getKey().toStringUtf8();
+      if (record.hasValue()) {
+        OmBucketInfo updateInfo = 
OmBucketInfo.getCodec().fromPersistedFormat(record.getValue().toByteArray());
+        OmBucketInfo bucketInfo = bucketTable.getSkipCache(key);
+        if (null != bucketInfo) {
+          updateInfo.incrUsedBytes(-updateInfo.getUsedBytes() + 
bucketInfo.getUsedBytes());
+          updateInfo.incrUsedNamespace(-updateInfo.getUsedNamespace() + 
bucketInfo.getUsedNamespace());
+          bucketTable.put(key, updateInfo);
+        } else {
+          bucketTable.getRawTable().putWithBatch(batchOperation, 
record.getKey().toByteArray(),
+              record.getValue().toByteArray());
+          bucketTable.addCacheEntry(key, updateInfo, -1);
+        }
+      } else {
+        bucketTable.getRawTable().deleteWithBatch(batchOperation, 
record.getKey().toByteArray());
+        bucketTable.addCacheEntry(key, -1);
+      }
+    }
+  }
+  private static void updateVolumeRecord(
+      Table<String, OmVolumeArgs> volumeTable, BatchOperation batchOperation,
+      List<OzoneManagerProtocolProtos.DBTableRecord> recordsList) throws 
IOException {
+    for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
+      String key = record.getKey().toStringUtf8();
+      if (record.hasValue()) {
+        OmVolumeArgs updateInfo = 
OmVolumeArgs.getCodec().fromPersistedFormat(record.getValue().toByteArray());
+        volumeTable.getRawTable().putWithBatch(batchOperation, 
record.getKey().toByteArray(),
+            record.getValue().toByteArray());
+        volumeTable.addCacheEntry(key, updateInfo, -1);
+      } else {
+        volumeTable.getRawTable().deleteWithBatch(batchOperation, 
record.getKey().toByteArray());
+        volumeTable.addCacheEntry(key, -1);
+      }
+    }
+  }
+
   public void audit(OzoneManager ozoneManager, 
OzoneManagerProtocolProtos.PersistDbRequest request,
                           TermIndex termIndex, Throwable th) {
     List<Long> indexList = request.getIndexList();
@@ -121,8 +199,4 @@ public class OMPersistDbRequest extends OMClientRequest {
           OMSystemAction.DBPERSIST, auditMap));
     }
   }
-
-  private void refreshCache(OzoneManager om, 
List<OzoneManagerProtocolProtos.DBTableUpdate> tblUpdateList) {
-    // TODO no-cache, update bucket and volume cache as full table cache in 
no-cache
-  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 88c5ad9140..d679b39dfb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
+import org.apache.hadoop.ozone.om.ratis.execution.BucketQuotaResource;
+import org.apache.hadoop.ozone.om.ratis.execution.OmBucketInfoQuotaTracker;
 import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
@@ -113,6 +115,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
   public static final Logger LOG = LoggerFactory.getLogger(OMKeyRequest.class);
 
   private BucketLayout bucketLayout = BucketLayout.DEFAULT;
+  private OmBucketInfo wrappedBucketInfo = null;
 
   public OMKeyRequest(OMRequest omRequest) {
     super(omRequest);
@@ -707,8 +710,17 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
 
     CacheValue<OmBucketInfo> value = omMetadataManager.getBucketTable()
         .getCacheValue(new CacheKey<>(bucketKey));
+    OmBucketInfo cachedBucketInfo = value != null ? value.getCacheValue() : 
null;
+    if (null != cachedBucketInfo && 
BucketQuotaResource.instance().isEnabledTrack()) {
+      wrappedBucketInfo = OmBucketInfoQuotaTracker.convert(cachedBucketInfo);
+      return wrappedBucketInfo;
+    }
+    return cachedBucketInfo;
+  }
 
-    return value != null ? value.getCacheValue() : null;
+  @Override
+  public OmBucketInfo getWrappedBucketInfo() {
+    return wrappedBucketInfo;
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 597a40006f..900dee13f6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -216,16 +216,6 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
                       omBucketInfo.getDefaultReplicationConfig() :
                       null, ozoneManager);
 
-          OmMultipartKeyInfo multipartKeyInfoFromArgs =
-              new OmMultipartKeyInfo.Builder()
-                  .setUploadID(keyArgs.getMultipartUploadID())
-                  .setCreationTime(keyArgs.getModificationTime())
-                  .setReplicationConfig(replicationConfig)
-                  .setObjectID(pathInfoFSO.getLeafNodeObjectId())
-                  .setUpdateID(trxnLogIndex)
-                  .setParentID(pathInfoFSO.getLastKnownParentId())
-                  .build();
-
           OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder()
               .setVolumeName(volumeName)
               .setBucketName(bucketName)
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 0293d1f880..4d885d08cf 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -89,6 +89,8 @@ public class OMPrepareRequest extends OMClientRequest {
     Exception exception = null;
 
     try {
+      // to enable at every follower node
+      ozoneManager.getPrepareState().enablePrepareGate();
       PrepareResponse omResponse = 
PrepareResponse.newBuilder().setTxnID(transactionLogIndex).build();
       responseBuilder.setPrepareResponse(omResponse);
       response = new DummyOMClientResponse(responseBuilder.build());
@@ -104,9 +106,25 @@ public class OMPrepareRequest extends OMClientRequest {
       
ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
           TransactionInfo.valueOf(termIndex, index));
 
+      // update ratis index with current one
       OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+      
omRatisServer.getOmBasicStateMachine().notifyTermIndexUpdated(termIndex.getTerm(),
 termIndex.getIndex());
+      Duration flushTimeout =
+          
Duration.of(omRequest.getPrepareRequest().getArgs().getTxnApplyWaitTimeoutSeconds(),
 ChronoUnit.SECONDS);
+      Duration flushInterval =
+          
Duration.of(omRequest.getPrepareRequest().getArgs().getTxnApplyCheckIntervalSeconds(),
 ChronoUnit.SECONDS);
+      long endTime = System.currentTimeMillis() + flushTimeout.toMillis();
+      while 
(omRatisServer.getOmBasicStateMachine().getLastAppliedTermIndex().getIndex() < 
(transactionLogIndex + 1)
+          && System.currentTimeMillis() < endTime) {
+        Thread.sleep(flushInterval.toMillis());
+      }
+      if 
(omRatisServer.getOmBasicStateMachine().getLastAppliedTermIndex().getIndex() < 
(transactionLogIndex + 1)) {
+        throw new IOException(String.format("After waiting for %d seconds, 
Ratis state machine applied index %d " +
+                "which is less than the minimum required index %d.", 
flushTimeout.getSeconds(), transactionLogIndex,
+            (transactionLogIndex + 1)));
+      }
       final RaftServer.Division division = omRatisServer.getServerDivision();
-      takeSnapshotAndPurgeLogs(transactionLogIndex, division);
+      takeSnapshotAndPurgeLogs(transactionLogIndex + 1, division);
 
       // Save prepare index to a marker file, so if the OM restarts,
       // it will remain in prepare mode as long as the file exists and its
@@ -119,7 +137,7 @@ public class OMPrepareRequest extends OMClientRequest {
       exception = e;
       LOG.error("Prepare Request Apply failed in {}. ", 
ozoneManager.getOMNodeId(), e);
       response = new 
DummyOMClientResponse(createErrorOMResponse(responseBuilder, e));
-    } catch (IOException e) {
+    } catch (InterruptedException | IOException e) {
       // Set error code so that prepare failure does not cause the OM to 
terminate.
       exception = e;
       LOG.error("Prepare Request Apply failed in {}. ", 
ozoneManager.getOMNodeId(), e);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 723aaa7bc4..43c855ebda 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -302,7 +302,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements OzoneManagerP
   /**
    * Submits request directly to OM.
    */
-  private OMResponse submitRequestDirectlyToOM(OMRequest request) {
+  private OMResponse submitRequestDirectlyToOM(OMRequest request) throws 
ServiceException {
     final OMClientResponse omClientResponse;
     try {
       if (OmUtils.isReadOnly(request)) {
@@ -317,6 +317,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements OzoneManagerP
           OMAuditLogger.log(omClientRequest.getAuditBuilder());
           throw ex;
         }
+        if (ozoneManager.isLeaderExecutorEnabled()) {
+          return submitRequestToRatis(request);
+        }
         final TermIndex termIndex = 
TransactionInfo.getTermIndex(transactionIndex.incrementAndGet());
         omClientResponse = handler.handleWriteRequest(request, termIndex, 
ozoneManagerDoubleBuffer);
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 04e8efa7b7..1b022747ac 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -211,6 +211,6 @@ public class TestDirectoryDeletingService {
           "base: " + delDirCnt[0] + ", new: " + delDirCnt[1]);
       delDirCnt[0] =  delDirCnt[1];
       return dirDeletingService.getDeletedDirsCount() >= dirCreatesCount;
-    }, 500, 300000);
+    }, 100, 300000);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to