This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch LeaderExecutor_Feature
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/LeaderExecutor_Feature by this
push:
new 38ddeff9aa HDDS-11403. granular lock and cache (#7271)
38ddeff9aa is described below
commit 38ddeff9aa96cc2a08cb001c9735fe2bef6e0a71
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Oct 11 21:48:02 2024 +0530
HDDS-11403. granular lock and cache (#7271)
---
.../apache/hadoop/hdds/utils/db/TypedTable.java | 5 +-
.../hadoop/hdds/utils/db/cache/NoTableCache.java | 102 +++++
.../hadoop/hdds/utils/db/cache/TableCache.java | 3 +-
.../hadoop/hdds/utils/db/cache/TestTableCache.java | 27 +-
.../hadoop/ozone/om/helpers/OmBucketInfo.java | 4 +-
.../om/snapshot/TestOzoneManagerHASnapshot.java | 5 +-
.../hadoop/ozone/shell/TestOzoneTenantShell.java | 5 -
.../src/main/proto/OmClientProtocol.proto | 2 +
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 54 ++-
.../hadoop/ozone/om/lock/FSOPrefixLocking.java | 239 ++++++++++++
.../apache/hadoop/ozone/om/lock/KeyLocking.java | 131 +++++++
.../org/apache/hadoop/ozone/om/lock/OmLockOpr.java | 411 +++++++++++++++++++++
.../hadoop/ozone/om/lock/OmRequestLockUtils.java | 225 +++++++++++
.../om/ratis/execution/BucketQuotaResource.java | 78 ++++
.../ratis/execution/FollowerRequestExecutor.java | 19 +-
.../om/ratis/execution/LeaderRequestExecutor.java | 212 ++++++++---
.../om/ratis/execution/OMBasicStateMachine.java | 16 +-
.../hadoop/ozone/om/ratis/execution/OMGateway.java | 166 ++++-----
.../ratis/execution/OmBucketInfoQuotaTracker.java | 111 ++++++
.../ozone/om/ratis/execution/PoolExecutor.java | 26 +-
.../om/ratis/utils/OzoneManagerRatisUtils.java | 9 +-
.../hadoop/ozone/om/request/OMClientRequest.java | 4 +
.../ozone/om/request/OMPersistDbRequest.java | 88 ++++-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 14 +-
.../S3MultipartUploadCompleteRequest.java | 10 -
.../ozone/om/request/upgrade/OMPrepareRequest.java | 22 +-
...OzoneManagerProtocolServerSideTranslatorPB.java | 5 +-
.../om/service/TestDirectoryDeletingService.java | 2 +-
28 files changed, 1750 insertions(+), 245 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index b8c4fe9b0a..f04e45254c 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheResult;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.hdds.utils.db.cache.FullTableCache;
+import org.apache.hadoop.hdds.utils.db.cache.NoTableCache;
import org.apache.hadoop.hdds.utils.db.cache.PartialTableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
@@ -125,6 +126,8 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
CacheValue.get(EPOCH_DEFAULT, kv.getValue()));
}
}
+ } else if (cacheType == CacheType.NO_CACHE) {
+ cache = new NoTableCache<>();
} else {
cache = new PartialTableCache<>(threadNamePrefix);
}
@@ -549,7 +552,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
}
@VisibleForTesting
- public TableCache<KEY, VALUE> getCache() {
+ TableCache<KEY, VALUE> getCache() {
return cache;
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/NoTableCache.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/NoTableCache.java
new file mode 100644
index 0000000000..c96c9b0b36
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/NoTableCache.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.utils.db.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
+import org.apache.hadoop.hdds.annotation.InterfaceStability.Evolving;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * No cache type implementation.
+ * @param <KEY>
+ * @param <VALUE>
+ */
+@Private
+@Evolving
+public class NoTableCache<KEY, VALUE> implements TableCache<KEY, VALUE> {
+ public static final Logger LOG = LoggerFactory.getLogger(NoTableCache.class);
+
+ public NoTableCache() {
+ }
+
+ @Override
+ public CacheValue<VALUE> get(CacheKey<KEY> cachekey) {
+ return null;
+ }
+
+ @Override
+ public void loadInitial(CacheKey<KEY> key, CacheValue<VALUE> value) {
+ // Do nothing for partial table cache.
+ }
+
+ @Override
+ public void put(CacheKey<KEY> cacheKey, CacheValue<VALUE> value) {
+ }
+
+ @Override
+ public void cleanup(List<Long> epochs) {
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> iterator() {
+ Map<CacheKey<KEY>, CacheValue<VALUE>> objectObjectMap =
Collections.emptyMap();
+ return objectObjectMap.entrySet().iterator();
+ }
+
+ @VisibleForTesting
+ @Override
+ public void evictCache(List<Long> epochs) {
+ }
+
+ @Override
+ public CacheResult<VALUE> lookup(CacheKey<KEY> cachekey) {
+ return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST, null);
+ }
+
+ @VisibleForTesting
+ @Override
+ public NavigableMap<Long, Set<CacheKey<KEY>>> getEpochEntries() {
+ return new ConcurrentSkipListMap<>();
+ }
+
+ @Override
+ public CacheStats getStats() {
+ return new CacheStats(0, 0, 0);
+ }
+
+ @Override
+ public CacheType getCacheType() {
+ return CacheType.PARTIAL_CACHE;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
index 62de974eac..c2b9541c8f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java
@@ -116,7 +116,8 @@ public interface TableCache<KEY, VALUE> {
enum CacheType {
FULL_CACHE, // This mean's the table maintains full cache. Cache and DB
// state are same.
- PARTIAL_CACHE // This is partial table cache, cache state is partial state
+ PARTIAL_CACHE, // This is partial table cache, cache state is partial state
+ NO_CACHE
// compared to DB state.
}
}
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
index 3f645f90e7..5c8c313ffd 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java
@@ -27,10 +27,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
@@ -54,8 +55,14 @@ public class TestTableCache {
}
}
+ static Stream<TableCache.CacheType> tableCacheTypes() {
+ return Stream.of(
+ TableCache.CacheType.FULL_CACHE,
+ TableCache.CacheType.PARTIAL_CACHE
+ );
+ }
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testPartialTableCache(TableCache.CacheType cacheType) {
createTableCache(cacheType);
@@ -99,7 +106,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testTableCacheWithRenameKey(TableCache.CacheType cacheType) {
createTableCache(cacheType);
@@ -155,7 +162,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testPartialTableCacheWithNotContinuousEntries(
TableCache.CacheType cacheType) {
@@ -206,7 +213,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testPartialTableCacheWithOverrideEntries(
TableCache.CacheType cacheType) {
@@ -277,7 +284,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testPartialTableCacheWithOverrideAndDelete(
TableCache.CacheType cacheType) {
@@ -374,7 +381,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testPartialTableCacheParallel(
TableCache.CacheType cacheType) throws Exception {
@@ -458,7 +465,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testTableCache(TableCache.CacheType cacheType) {
createTableCache(cacheType);
@@ -491,7 +498,7 @@ public class TestTableCache {
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testTableCacheWithNonConsecutiveEpochList(
TableCache.CacheType cacheType) {
@@ -562,7 +569,7 @@ public class TestTableCache {
}
@ParameterizedTest
- @EnumSource(TableCache.CacheType.class)
+ @MethodSource("tableCacheTypes")
public void testTableCacheStats(TableCache.CacheType cacheType) {
createTableCache(cacheType);
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index 5a83f6dbba..7c911ae3e9 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -44,7 +44,7 @@ import com.google.common.base.Preconditions;
/**
* A class that encapsulates Bucket Info.
*/
-public final class OmBucketInfo extends WithObjectID implements Auditable,
CopyObject<OmBucketInfo> {
+public class OmBucketInfo extends WithObjectID implements Auditable,
CopyObject<OmBucketInfo> {
private static final Codec<OmBucketInfo> CODEC = new DelegatedCodec<>(
Proto2Codec.get(BucketInfo.getDefaultInstance()),
OmBucketInfo::getFromProtobuf,
@@ -110,7 +110,7 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable, CopyO
private String owner;
- private OmBucketInfo(Builder b) {
+ protected OmBucketInfo(Builder b) {
super(b);
this.volumeName = b.volumeName;
this.bucketName = b.bucketName;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
index 12c5d1e61e..447034f256 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java
@@ -108,7 +108,6 @@ public class TestOzoneManagerHASnapshot {
// Test snapshot diff when OM restarts in HA OM env.
@Test
- @Unhealthy("HDDS-11415 follower cache update")
public void testSnapshotDiffWhenOmLeaderRestart()
throws Exception {
String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(10);
@@ -165,7 +164,6 @@ public class TestOzoneManagerHASnapshot {
}
@Test
- @Unhealthy("HDDS-11415 follower cache update")
public void testSnapshotIdConsistency() throws Exception {
createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
@@ -203,7 +201,6 @@ public class TestOzoneManagerHASnapshot {
* passed or empty.
*/
@Test
- @Unhealthy("HDDS-11415 follower cache update")
public void testSnapshotNameConsistency() throws Exception {
store.createSnapshot(volumeName, bucketName, "");
List<OzoneManager> ozoneManagers = cluster.getOzoneManagersList();
@@ -286,7 +283,7 @@ public class TestOzoneManagerHASnapshot {
* and purgeSnapshot in same batch.
*/
@Test
- @Unhealthy("HDDS-11415 om statemachine change and follower cache update")
+ @Unhealthy("HDDS-11415 om statemachine change, remove")
public void testKeyAndSnapshotDeletionService() throws IOException,
InterruptedException, TimeoutException {
OzoneManager omLeader = cluster.getOMLeader();
OzoneManager omFollower;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index f253f79935..5d64750714 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -38,7 +38,6 @@ import
org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantCreateRequest;
import org.apache.hadoop.ozone.shell.tenant.TenantShell;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
-import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
@@ -357,7 +356,6 @@ public class TestOzoneTenantShell {
}
@Test
- @Unhealthy("HDDS-11415 follower cache issue validation fail")
public void testAssignAdmin() throws IOException {
final String tenantName = "devaa";
@@ -413,7 +411,6 @@ public class TestOzoneTenantShell {
* and revoke user flow.
*/
@Test
- @Unhealthy("HDDS-11415 follower cache issue validation fail")
@SuppressWarnings("methodlength")
public void testOzoneTenantBasicOperations() throws IOException {
@@ -688,7 +685,6 @@ public class TestOzoneTenantShell {
}
@Test
- @Unhealthy("HDDS-11415 follower cache issue validation fail")
public void testListTenantUsers() throws IOException {
executeHA(tenantShell, new String[] {"--verbose", "create", "tenant1"});
checkOutput(out, "{\n" +
@@ -769,7 +765,6 @@ public class TestOzoneTenantShell {
}
@Test
- @Unhealthy("HDDS-11415 follower cache issue validation fail")
public void testTenantSetSecret() throws IOException, InterruptedException {
final String tenantName = "tenant-test-set-secret";
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index da99dba59e..09cf20cf4f 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -2235,6 +2235,7 @@ message BucketQuotaCount {
required int64 diffUsedBytes = 3;
required int64 diffUsedNamespace = 4;
required bool supportOldQuota = 5 [default=false];
+ optional uint64 bucketObjectId = 6;
}
message QuotaRepairResponse {
@@ -2264,6 +2265,7 @@ message OMLockDetailsProto {
message PersistDbRequest {
repeated DBTableUpdate tableUpdates = 1;
repeated int64 index = 2;
+ repeated BucketQuotaCount bucketQuotaCount = 3;
}
message DBTableUpdate {
required string tableName = 1;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 4873a7db49..e46844fb0e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -329,6 +329,7 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
private SnapshotChainManager snapshotChainManager;
private final OMPerformanceMetrics perfMetrics;
private final S3Batcher s3Batcher = new S3SecretBatcher();
+ private boolean isLeaderExecutorEnabled = false;
/**
* OmMetadataManagerImpl constructor.
@@ -359,6 +360,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
// For test purpose only
ignorePipelineinKey = conf.getBoolean(
"ozone.om.ignore.pipeline", Boolean.TRUE);
+ isLeaderExecutorEnabled =
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+ OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
start(conf);
}
@@ -370,6 +373,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
this.lock = new OzoneManagerLock(conf);
this.omEpoch = 0;
perfMetrics = null;
+ isLeaderExecutorEnabled =
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+ OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
}
public static OmMetadataManagerImpl createCheckpointMetadataManager(
@@ -401,6 +406,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
throws IOException {
lock = new OmReadOnlyLock();
omEpoch = 0;
+ isLeaderExecutorEnabled =
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+ OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
setStore(loadDB(conf, dir, name, true,
java.util.Optional.of(Boolean.TRUE), Optional.empty()));
initializeOmTables(CacheType.PARTIAL_CACHE, false);
@@ -414,6 +421,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
try {
lock = new OmReadOnlyLock();
omEpoch = 0;
+ isLeaderExecutorEnabled =
conf.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+ OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
String snapshotDir = OMStorage.getOmDbDir(conf) +
OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR;
File metaDir = new File(snapshotDir);
@@ -683,9 +692,14 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
protected void initializeOmTables(CacheType cacheType,
boolean addCacheMetrics)
throws IOException {
+ CacheType defaultCacheType = CacheType.PARTIAL_CACHE;
+ if (isLeaderExecutorEnabled) {
+ // TODO HDDS-11415 to change PARTIAL_CACHE --- NO_CACHE, referring
PARTIAL_CACHE for testcase success
+ defaultCacheType = CacheType.PARTIAL_CACHE;
+ }
userTable =
this.store.getTable(USER_TABLE, String.class,
- PersistedUserVolumeInfo.class);
+ PersistedUserVolumeInfo.class, defaultCacheType);
checkTableStatus(userTable, USER_TABLE, addCacheMetrics);
volumeTable =
@@ -699,92 +713,92 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
checkTableStatus(bucketTable, BUCKET_TABLE, addCacheMetrics);
- keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
+ keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class,
defaultCacheType);
checkTableStatus(keyTable, KEY_TABLE, addCacheMetrics);
deletedTable = this.store.getTable(DELETED_TABLE, String.class,
- RepeatedOmKeyInfo.class);
+ RepeatedOmKeyInfo.class, defaultCacheType);
checkTableStatus(deletedTable, DELETED_TABLE, addCacheMetrics);
openKeyTable =
this.store.getTable(OPEN_KEY_TABLE, String.class,
- OmKeyInfo.class);
+ OmKeyInfo.class, defaultCacheType);
checkTableStatus(openKeyTable, OPEN_KEY_TABLE, addCacheMetrics);
multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
- String.class, OmMultipartKeyInfo.class);
+ String.class, OmMultipartKeyInfo.class, defaultCacheType);
checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE, addCacheMetrics);
dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
- OzoneTokenIdentifier.class, Long.class);
+ OzoneTokenIdentifier.class, Long.class, defaultCacheType);
checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE, addCacheMetrics);
s3SecretTable = this.store.getTable(S3_SECRET_TABLE, String.class,
- S3SecretValue.class);
+ S3SecretValue.class, defaultCacheType);
checkTableStatus(s3SecretTable, S3_SECRET_TABLE, addCacheMetrics);
prefixTable = this.store.getTable(PREFIX_TABLE, String.class,
- OmPrefixInfo.class);
+ OmPrefixInfo.class, defaultCacheType);
checkTableStatus(prefixTable, PREFIX_TABLE, addCacheMetrics);
dirTable = this.store.getTable(DIRECTORY_TABLE, String.class,
- OmDirectoryInfo.class);
+ OmDirectoryInfo.class, defaultCacheType);
checkTableStatus(dirTable, DIRECTORY_TABLE, addCacheMetrics);
fileTable = this.store.getTable(FILE_TABLE, String.class,
- OmKeyInfo.class);
+ OmKeyInfo.class, defaultCacheType);
checkTableStatus(fileTable, FILE_TABLE, addCacheMetrics);
openFileTable = this.store.getTable(OPEN_FILE_TABLE, String.class,
- OmKeyInfo.class);
+ OmKeyInfo.class, defaultCacheType);
checkTableStatus(openFileTable, OPEN_FILE_TABLE, addCacheMetrics);
deletedDirTable = this.store.getTable(DELETED_DIR_TABLE, String.class,
- OmKeyInfo.class);
+ OmKeyInfo.class, defaultCacheType);
checkTableStatus(deletedDirTable, DELETED_DIR_TABLE, addCacheMetrics);
transactionInfoTable = this.store.getTable(TRANSACTION_INFO_TABLE,
- String.class, TransactionInfo.class);
+ String.class, TransactionInfo.class, defaultCacheType);
checkTableStatus(transactionInfoTable, TRANSACTION_INFO_TABLE,
addCacheMetrics);
- metaTable = this.store.getTable(META_TABLE, String.class, String.class);
+ metaTable = this.store.getTable(META_TABLE, String.class, String.class,
defaultCacheType);
checkTableStatus(metaTable, META_TABLE, addCacheMetrics);
// accessId -> OmDBAccessIdInfo (tenantId, secret, Kerberos principal)
tenantAccessIdTable = this.store.getTable(TENANT_ACCESS_ID_TABLE,
- String.class, OmDBAccessIdInfo.class);
+ String.class, OmDBAccessIdInfo.class, defaultCacheType);
checkTableStatus(tenantAccessIdTable, TENANT_ACCESS_ID_TABLE,
addCacheMetrics);
// User principal -> OmDBUserPrincipalInfo (a list of accessIds)
principalToAccessIdsTable = this.store.getTable(
PRINCIPAL_TO_ACCESS_IDS_TABLE,
- String.class, OmDBUserPrincipalInfo.class);
+ String.class, OmDBUserPrincipalInfo.class, defaultCacheType);
checkTableStatus(principalToAccessIdsTable, PRINCIPAL_TO_ACCESS_IDS_TABLE,
addCacheMetrics);
// tenant name -> tenant (tenant states)
tenantStateTable = this.store.getTable(TENANT_STATE_TABLE,
- String.class, OmDBTenantState.class);
+ String.class, OmDBTenantState.class, defaultCacheType);
checkTableStatus(tenantStateTable, TENANT_STATE_TABLE, addCacheMetrics);
// TODO: [SNAPSHOT] Consider FULL_CACHE for snapshotInfoTable since
// exclusiveSize in SnapshotInfo can be frequently updated.
// path -> snapshotInfo (snapshot info for snapshot)
snapshotInfoTable = this.store.getTable(SNAPSHOT_INFO_TABLE,
- String.class, SnapshotInfo.class);
+ String.class, SnapshotInfo.class, defaultCacheType);
checkTableStatus(snapshotInfoTable, SNAPSHOT_INFO_TABLE, addCacheMetrics);
// volumeName/bucketName/objectID -> renamedKey or renamedDir
snapshotRenamedTable = this.store.getTable(SNAPSHOT_RENAMED_TABLE,
- String.class, String.class);
+ String.class, String.class, defaultCacheType);
checkTableStatus(snapshotRenamedTable, SNAPSHOT_RENAMED_TABLE,
addCacheMetrics);
// TODO: [SNAPSHOT] Initialize table lock for snapshotRenamedTable.
compactionLogTable = this.store.getTable(COMPACTION_LOG_TABLE,
- String.class, CompactionLogEntry.class);
+ String.class, CompactionLogEntry.class, defaultCacheType);
checkTableStatus(compactionLogTable, COMPACTION_LOG_TABLE,
addCacheMetrics);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/FSOPrefixLocking.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/FSOPrefixLocking.java
new file mode 100644
index 0000000000..a3fd8dbd35
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/FSOPrefixLocking.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+
+/**
+ * Prefix locking for FSO.
+ */
+public class FSOPrefixLocking {
+ private static final int LRU_CACH_MIN_SIZE = 1000;
+ private static final int LRU_TRY_REMOVE_PERCENT = 10;
+ private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
+ private final ScheduledExecutorService executorService;
+ private final AtomicLong writeLockCnt = new AtomicLong();
+ private final AtomicLong readLockCnt = new AtomicLong();
+
+ private LockInfo lockInfo = new LockInfo(null, null);
+ private NavigableSet<LockInfo> lru = new ConcurrentSkipListSet<>();
+
+ public FSOPrefixLocking(String threadPrefix) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat(threadPrefix + "FullTableCache-Cleanup-%d").build();
+ executorService = Executors.newScheduledThreadPool(1, threadFactory);
+ executorService.scheduleWithFixedDelay(() -> cleanupTask(), 1000L, 1000L,
TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ executorService.shutdown();
+ }
+
+ private void cleanupTask() {
+ // most recent will be leaf node
+ int size = lru.size();
+ if (size > LRU_CACH_MIN_SIZE) {
+ int count = size / LRU_TRY_REMOVE_PERCENT;
+ for (int i = 0; i < count; ++i) {
+ LockInfo tmpLockInfo = lru.pollLast();
+ // try remove, if not successful, add back
+ try {
+ boolean b = tmpLockInfo.removeSelf();
+ if (!b) {
+ lru.add(tmpLockInfo);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ public List<LockInfo> readLock(List<String> keyElements) throws OMException {
+ List<LockInfo> acquiredLocks = new ArrayList<>(keyElements.size());
+ LockInfo tmpLockInfo = lockInfo;
+ for (String path : keyElements) {
+ tmpLockInfo = tmpLockInfo.addPath(path);
+ tmpLockInfo.readLock();
+ acquiredLocks.add(tmpLockInfo);
+ readLockCnt.incrementAndGet();
+ }
+ return acquiredLocks;
+ }
+
+ public void readUnlock(List<LockInfo> acquiredLocks) {
+ ListIterator<LockInfo> li =
acquiredLocks.listIterator(acquiredLocks.size());
+ while (li.hasPrevious()) {
+ LockInfo previous = li.previous();
+ previous.readUnlock();
+ updateLru(previous);
+ readLockCnt.decrementAndGet();
+ }
+ }
+
+ public LockInfo writeLock(LockInfo prvLockInfo, String name) throws
IOException {
+ if (null == prvLockInfo) {
+ prvLockInfo = lockInfo;
+ }
+ prvLockInfo = prvLockInfo.addPath(name);
+ prvLockInfo.writeLock();
+ writeLockCnt.incrementAndGet();
+ return prvLockInfo;
+ }
+
+ public void writeUnlock(LockInfo curLockInfo) {
+ curLockInfo.writeUnlock();
+ updateLru(curLockInfo);
+ writeLockCnt.decrementAndGet();
+ }
+
+ private void updateLru(LockInfo previous) {
+ // update is done in unlock as this is more applicable for lru cleanup as
lock is released
+ lru.remove(previous);
+ lru.add(previous);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Lock status: Write lock count %d, Read lock Count
%d", writeLockCnt.get(), readLockCnt.get());
+ }
+ /**
+ * lock info about lock tree.
+ */
+ public static class LockInfo implements Comparable {
+ private String key;
+ private Map<String, LockInfo> lockMap = new ConcurrentHashMap<>();
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ private LockInfo parent;
+ public LockInfo(LockInfo p, String k) {
+ parent = p;
+ this.key = k;
+ }
+ public LockInfo get(String path) {
+ return lockMap.get(path);
+ }
+ public LockInfo addPath(String path) {
+ // concurrentHashMap computeIfAbsent is atomic
+ LockInfo lockInfo = lockMap.computeIfAbsent(path, k -> new
LockInfo(this, k));
+ return lockInfo;
+ }
+ public void readLock() throws OMException {
+ try {
+ boolean b = rwLock.readLock().tryLock(LOCK_TIMEOUT,
TimeUnit.MILLISECONDS);
+ if (!b) {
+ throw new OMException("Unable to get read lock for " + key + " after
" + LOCK_TIMEOUT + "ms",
+ OMException.ResultCodes.TIMEOUT);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ public void readUnlock() {
+ rwLock.readLock().unlock();
+ }
+ public void writeLock() throws IOException {
+ try {
+ boolean b = writeLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!b) {
+ throw new OMException("Unable to get write lock for " + key + "
after " + LOCK_TIMEOUT + "ms",
+ OMException.ResultCodes.TIMEOUT);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ public void writeUnlock() {
+ rwLock.writeLock().unlock();
+ }
+ private boolean writeLock(long time, TimeUnit timeUnit) throws
InterruptedException {
+ return rwLock.writeLock().tryLock(time, timeUnit);
+ }
+ private void remove(String path) {
+ lockMap.remove(path);
+ }
+ public boolean removeSelf() throws InterruptedException {
+ boolean selfLock = false;
+ boolean parentLock = false;
+ try {
+ selfLock = writeLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (selfLock) {
+ parentLock = parent.writeLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (parentLock) {
+ parent.remove(key);
+ }
+ }
+ } finally {
+ if (selfLock) {
+ writeUnlock();
+ }
+ if (parentLock) {
+ parent.writeUnlock();
+ }
+ }
+ return parentLock;
+ }
+
+ public LockInfo getParent() {
+ return parent;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof LockInfo)) {
+ return false;
+ }
+ if (this == other) {
+ return true;
+ }
+ return this.compareTo(other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (!(o instanceof LockInfo)) {
+ return -1;
+ }
+ int rst = Integer.compare(parent.hashCode(), ((LockInfo)
o).parent.hashCode());
+ if (rst == 0) {
+ return key.compareTo(((LockInfo) o).key);
+ }
+ return rst;
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java
new file mode 100644
index 0000000000..ad6ddc6dc1
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLocking.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.util.concurrent.Striped;
+import java.io.IOException;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * key locking.
+ */
+public class KeyLocking {
+ private static final Logger LOG = LoggerFactory.getLogger(KeyLocking.class);
+ private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 10240;
+ private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
+ private Striped<ReadWriteLock> fileStripedLock =
Striped.readWriteLock(DEFAULT_FILE_LOCK_STRIPED_SIZE);
+ private AtomicLong writeLockCount = new AtomicLong();
+ private AtomicLong readLockCount = new AtomicLong();
+ private AtomicLong failedLockCount = new AtomicLong();
+ private AtomicLong failedUnlockCount = new AtomicLong();
+
+ public void lock(List<String> keyList) throws IOException {
+ for (String key : keyList) {
+ lock(key);
+ }
+ }
+
+ public void unlock(List<String> keyList) {
+ ListIterator<String> itr = keyList.listIterator(keyList.size());
+ while (itr.hasPrevious()) {
+ unlock(itr.previous());
+ }
+ }
+
+ public void lock(String key) throws IOException {
+ LOG.debug("Key {} is locked for instance {} {}", key, this,
fileStripedLock.get(key));
+ try {
+ boolean b = fileStripedLock.get(key).writeLock().tryLock(LOCK_TIMEOUT,
TimeUnit.MILLISECONDS);
+ if (!b) {
+ LOG.error("Key {} lock is failed after wait of {}ms", key,
LOCK_TIMEOUT);
+ failedLockCount.incrementAndGet();
+ throw new OMException("Unable to get write lock for " + key + " after
" + LOCK_TIMEOUT + "ms"
+ + ", lock info: " + fileStripedLock.get(key).readLock(),
+ OMException.ResultCodes.TIMEOUT);
+ }
+ writeLockCount.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void unlock(String key) {
+ LOG.debug("Key {} is un-locked for instance {} {}", key, this,
fileStripedLock.get(key));
+ try {
+ fileStripedLock.get(key).writeLock().unlock();
+ } catch (Throwable th) {
+ LOG.error("Key {} un-lock is failed", key, th);
+ failedUnlockCount.incrementAndGet();
+ throw th;
+ }
+ writeLockCount.decrementAndGet();
+ }
+
+ public void readLock(List<String> keyList) throws OMException {
+ for (String key : keyList) {
+ readLock(key);
+ }
+ }
+
+ public void readUnlock(List<String> keyList) {
+ ListIterator<String> itr = keyList.listIterator(keyList.size());
+ while (itr.hasPrevious()) {
+ readUnlock(itr.previous());
+ }
+ }
+ public void readLock(String key) throws OMException {
+ try {
+ LOG.debug("Key {} is read locked for instance {} {}", key, this,
fileStripedLock.get(key));
+ boolean b = fileStripedLock.get(key).readLock().tryLock(LOCK_TIMEOUT,
TimeUnit.MILLISECONDS);
+ if (!b) {
+ failedLockCount.incrementAndGet();
+ throw new OMException("Unable to get read lock for " + key + " after "
+ LOCK_TIMEOUT + "ms",
+ OMException.ResultCodes.TIMEOUT);
+ }
+ readLockCount.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void readUnlock(String key) {
+ try {
+ LOG.debug("Key {} is read un-locked for instance {} {}", key, this,
fileStripedLock.get(key));
+ fileStripedLock.get(key).readLock().unlock();
+ } catch (Throwable th) {
+ LOG.error("Key {} read un-lock is failed", key, th);
+ failedUnlockCount.incrementAndGet();
+ throw th;
+ }
+ readLockCount.decrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Lock status: Write lock count %d, Read lock Count
%d, failed lock count %d" +
+ ", failed un-lock count %d", writeLockCount.get(),
readLockCount.get(), failedLockCount.get(),
+ failedUnlockCount.get());
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java
new file mode 100644
index 0000000000..8ce7855bc5
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * key locking.
+ */
+public class OmLockOpr {
+ private static final Logger LOG = LoggerFactory.getLogger(OmLockOpr.class);
+ private static final long MONITOR_DELAY = 10 * 60 * 1000;
+ private static KeyLocking keyLocking;
+ // volume locking needs separate as different lock key type can have same
name
+ private static KeyLocking volumeLocking;
+ private static KeyLocking bucketLocking;
+ private static KeyLocking snapshotLocking;
+ private static FSOPrefixLocking prefixLocking;
+ private static Map<OmLockOpr, OmLockOpr> lockedObjMap;
+ private static ScheduledExecutorService executorService;
+
+ private LockType type;
+ private String volumeName;
+ private String bucketName;
+ private String keyName;
+ private List<String> keyNameList;
+ private List<List<FSOPrefixLocking.LockInfo>> lockedKeyFsoList = null;
+ private List<String> lockedParentList = null;
+ private OMLockDetails lockDetails = new OMLockDetails();
+ private long lockTakenTime = 0;
+
+ public OmLockOpr() {
+ this.type = LockType.NONE;
+ }
+
+ public OmLockOpr(LockType type, String keyName) {
+ assert (type == LockType.W_VOLUME || type == LockType.R_VOLUME || type ==
LockType.W_BUCKET
+ || type == LockType.WRITE);
+ this.type = type;
+ this.keyName = keyName;
+ }
+ public OmLockOpr(LockType type, String volumeName, String bucketName) {
+ assert (type == LockType.RW_VOLUME_BUCKET);
+ this.type = type;
+ this.volumeName = volumeName;
+ this.bucketName = bucketName;
+ }
+ public OmLockOpr(LockType type, String volume, String bucket, String
keyName) {
+ this.type = type;
+ this.volumeName = volume;
+ this.bucketName = bucket;
+ this.keyName = keyName;
+ }
+
+ public OmLockOpr(LockType type, String volume, String bucket, List<String>
keyNameList) {
+ this.type = type;
+ this.volumeName = volume;
+ this.bucketName = bucket;
+ this.keyNameList = new ArrayList<>(keyNameList);
+ Collections.sort(this.keyNameList);
+ }
+ public static void init(String threadNamePrefix) {
+ keyLocking = new KeyLocking();
+ volumeLocking = new KeyLocking();
+ snapshotLocking = new KeyLocking();
+ bucketLocking = new KeyLocking();
+ prefixLocking = new FSOPrefixLocking(threadNamePrefix);
+ lockedObjMap = new ConcurrentHashMap<>();
+ // init scheduler to check and monitor
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat(threadNamePrefix + "OmLockOpr-Monitor-%d").build();
+ executorService = Executors.newScheduledThreadPool(1, threadFactory);
+ executorService.scheduleWithFixedDelay(() -> monitor(), 0, MONITOR_DELAY,
TimeUnit.MILLISECONDS);
+ }
+
+ public static void stop() {
+ executorService.shutdown();
+ prefixLocking.stop();
+ }
+
+ public static void monitor() {
+ LOG.info("FSO: " + prefixLocking.toString());
+ LOG.info("Volume: " + volumeLocking.toString());
+ LOG.info("Snapshot: " + snapshotLocking.toString());
+ LOG.info("Key: " + keyLocking.toString());
+ LOG.info("Key: " + bucketLocking.toString());
+ LOG.info("Lock operation Status crossing threshold (10 minutes):");
+ long startTime = Time.monotonicNowNanos();
+ for (Map.Entry<OmLockOpr, OmLockOpr> entry : lockedObjMap.entrySet()) {
+ if ((startTime - entry.getKey().getLockTakenTime()) > MONITOR_DELAY) {
+ LOG.info("Lock is hold for {}", entry.getKey());
+ }
+ }
+ }
+
+ private long getLockTakenTime() {
+ return lockTakenTime;
+ }
+
+ public void lock(OzoneManager om) throws IOException {
+ long startTime, endTime;
+ switch (type) {
+ case W_FSO:
+ startTime = Time.monotonicNowNanos();
+ bucketLocking.readLock(bucketName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ lockedKeyFsoList = new ArrayList<>();
+ if (keyName != null) {
+ lockedKeyFsoList.add(lockFso(om.getMetadataManager(), volumeName,
bucketName, keyName, lockDetails));
+ } else {
+ for (String key : keyNameList) {
+ lockedKeyFsoList.add(lockFso(om.getMetadataManager(), volumeName,
bucketName, key, lockDetails));
+ }
+ }
+ break;
+ case W_OBS:
+ startTime = Time.monotonicNowNanos();
+ bucketLocking.readLock(bucketName);
+ if (null != keyName) {
+ keyLocking.lock(keyName);
+ } else {
+ keyLocking.lock(keyNameList);
+ }
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case W_LEGACY_FSO:
+ startTime = Time.monotonicNowNanos();
+ bucketLocking.readLock(bucketName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ lockedParentList = getMissingParentPathList(om.getMetadataManager(),
volumeName, bucketName, keyName);
+ startTime = Time.monotonicNowNanos();
+ keyLocking.lock(lockedParentList);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case SNAPSHOT:
+ startTime = Time.monotonicNowNanos();
+ bucketLocking.readLock(bucketName);
+ if (keyName != null) {
+ snapshotLocking.lock(keyName);
+ } else {
+ snapshotLocking.lock(keyNameList);
+ }
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case RW_VOLUME_BUCKET:
+ startTime = Time.monotonicNowNanos();
+ volumeLocking.readLock(volumeName);
+ bucketLocking.lock(bucketName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case W_VOLUME:
+ startTime = Time.monotonicNowNanos();
+ volumeLocking.lock(keyName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case R_VOLUME:
+ startTime = Time.monotonicNowNanos();
+ volumeLocking.readLock(keyName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case W_BUCKET:
+ startTime = Time.monotonicNowNanos();
+ bucketLocking.lock(keyName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case WRITE:
+ startTime = Time.monotonicNowNanos();
+ keyLocking.lock(keyName);
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ break;
+ case NONE:
+ break;
+ default:
+ LOG.error("Invalid Lock Operation type {}", type);
+ }
+ lockTakenTime = Time.monotonicNowNanos();
+ lockDetails.setLockAcquired(true);
+ lockedObjMap.put(this, this);
+ }
+
+ public void unlock() {
+ if (!lockDetails.isLockAcquired()) {
+ return;
+ }
+ lockDetails.setLockAcquired(false);
+ switch (type) {
+ case W_FSO:
+ for (List<FSOPrefixLocking.LockInfo> lockedFsoList : lockedKeyFsoList) {
+ unlockFso(lockedFsoList);
+ }
+ bucketLocking.readUnlock(bucketName);
+ break;
+ case W_OBS:
+ if (null != keyName) {
+ keyLocking.unlock(keyName);
+ } else {
+ keyLocking.unlock(keyNameList);
+ }
+ bucketLocking.readUnlock(bucketName);
+ break;
+ case W_LEGACY_FSO:
+ keyLocking.unlock(lockedParentList);
+ bucketLocking.readUnlock(bucketName);
+ break;
+ case SNAPSHOT:
+ if (keyName != null) {
+ snapshotLocking.unlock(keyName);
+ } else {
+ snapshotLocking.unlock(keyNameList);
+ }
+ bucketLocking.readUnlock(bucketName);
+ break;
+ case RW_VOLUME_BUCKET:
+ bucketLocking.unlock(bucketName);
+ volumeLocking.readUnlock(volumeName);
+ break;
+ case W_VOLUME:
+ volumeLocking.unlock(keyName);
+ break;
+ case R_VOLUME:
+ volumeLocking.readUnlock(keyName);
+ break;
+ case W_BUCKET:
+ bucketLocking.unlock(keyName);
+ break;
+ case WRITE:
+ keyLocking.unlock(keyName);
+ break;
+ case NONE:
+ break;
+ default:
+ LOG.error("Invalid un-lock Operation type {}", type);
+ }
+ if (lockTakenTime > 0) {
+ if (type == LockType.R_VOLUME) {
+ lockDetails.add(Time.monotonicNowNanos() - lockTakenTime,
OMLockDetails.LockOpType.READ);
+ } else {
+ lockDetails.add(Time.monotonicNowNanos() - lockTakenTime,
OMLockDetails.LockOpType.WRITE);
+ }
+ }
+ lockedObjMap.remove(this);
+ }
+ private static List<FSOPrefixLocking.LockInfo> lockFso(
+ OMMetadataManager omMetadataManager, String volumeName, String
bucketName, String keyName,
+ OMLockDetails lockDetails) throws IOException {
+ List<String> pathList = getPathList(keyName);
+ List<String> dirList = pathList.subList(0, pathList.size() - 1);
+ long startTime = Time.monotonicNowNanos();
+ List<FSOPrefixLocking.LockInfo> locks = prefixLocking.readLock(dirList);
+ long endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ // validate for path exist
+ final long volumeId = omMetadataManager.getVolumeId(volumeName);
+ String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+ OmBucketInfo omBucketInfo =
omMetadataManager.getBucketTable().get(bucketKey);
+ final long bucketId = omBucketInfo.getObjectID();
+ long parentId = omBucketInfo.getObjectID();
+ int splitIndex = -1;
+ for (int i = 0; i < dirList.size(); ++i) {
+ String dbNodeName = omMetadataManager.getOzonePathKey(volumeId,
bucketId, parentId, dirList.get(i));
+ OmDirectoryInfo omDirInfo =
omMetadataManager.getDirectoryTable().get(dbNodeName);
+ if (omDirInfo != null) {
+ parentId = omDirInfo.getObjectID();
+ continue;
+ }
+ if
(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED).isExist(dbNodeName))
{
+ // do not support directory as file in path, take write lock from that
point as dir is non-existing
+ // and can not traverse more deep
+ LOG.debug("dir as file {} exist, take that as split to take write lock
for path {}", dbNodeName, keyName);
+ }
+ splitIndex = i;
+ break;
+ }
+
+ // update lock with write
+ startTime = Time.monotonicNowNanos();
+ if (splitIndex == 0) {
+ prefixLocking.readUnlock(locks);
+ locks.clear();
+ locks.add(prefixLocking.writeLock(null, dirList.get(splitIndex)));
+ } else if (splitIndex != -1) {
+ List<FSOPrefixLocking.LockInfo> releaseLocks = locks.subList(splitIndex,
dirList.size());
+ prefixLocking.readUnlock(releaseLocks);
+ locks = locks.subList(0, splitIndex);
+ locks.add(prefixLocking.writeLock(locks.get(splitIndex - 1),
dirList.get(splitIndex)));
+ } else {
+ String leafNode = pathList.get(pathList.size() - 1);
+ if (locks.isEmpty()) {
+ locks.add(prefixLocking.writeLock(null, leafNode));
+ } else {
+ locks.add(prefixLocking.writeLock(locks.get(locks.size() - 1),
leafNode));
+ }
+ }
+ endTime = Time.monotonicNowNanos();
+ lockDetails.add(endTime - startTime, OMLockDetails.LockOpType.WAIT);
+ return locks;
+ }
+
+ private static void unlockFso(List<FSOPrefixLocking.LockInfo> acquiredLocks)
{
+ if (acquiredLocks.size() == 0) {
+ return;
+ }
+ prefixLocking.writeUnlock(acquiredLocks.get(acquiredLocks.size() - 1));
+ if (acquiredLocks.size() > 1) {
+ prefixLocking.readUnlock(acquiredLocks.subList(0, acquiredLocks.size() -
1));
+ }
+ }
+
+ private static List<String> getPathList(String fullPath) {
+ List<String> pathList = new ArrayList<>();
+ Path path = Paths.get(fullPath);
+ Iterator<Path> elements = path.iterator();
+ while (elements.hasNext()) {
+ pathList.add(elements.next().toString());
+ }
+ return pathList;
+ }
+
+ private static List<String> getMissingParentPathList(
+ OMMetadataManager omMetadataManager, String volumeName, String
bucketName, String fullPath) throws IOException {
+ List<String> pathList = new ArrayList<>();
+ Path path = Paths.get(fullPath);
+ while (path != null) {
+ String pathName = path.toString();
+ String dbKeyName = omMetadataManager.getOzoneKey(volumeName, bucketName,
pathName);
+ String dbDirKeyName = omMetadataManager.getOzoneDirKey(volumeName,
bucketName, pathName);
+ if
(!omMetadataManager.getKeyTable(BucketLayout.LEGACY).isExist(dbKeyName)
+ &&
!omMetadataManager.getKeyTable(BucketLayout.LEGACY).isExist(dbDirKeyName)) {
+ pathList.add(pathName);
+ }
+ path = path.getParent();
+ }
+ return pathList;
+ }
+
+ public OMLockDetails getLockDetails() {
+ return lockDetails;
+ }
+
+ public String toString() {
+ return String.format("type %s, volumeName %s, bucketName %s, keyName %s,
keyNameList %s",
+ type.name(), volumeName, bucketName, keyName, keyNameList);
+ }
+ /**
+ * lock operatio type.
+ */
+ public enum LockType {
+ NONE,
+ // Lock as per FSO structure with last element missing write lock,
otherwise read lock
+ // volume name, bucket name and either keyName or keyNameList is mandatory
+ W_FSO,
+ // Lock as per OBS, all elements with write lock
+ W_OBS,
+ // Lock specific Legacy FSO identifying missing path and having write lock
+ W_LEGACY_FSO,
+ W_VOLUME,
+ R_VOLUME,
+ W_BUCKET,
+ WRITE,
+ // lock volume as read, but bucket as write lock
+ RW_VOLUME_BUCKET,
+ // separate bucket of snapshot locking with snapshot name write lock
+ SNAPSHOT
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestLockUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestLockUtils.java
new file mode 100644
index 0000000000..196e7580bc
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestLockUtils.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.lock.OmLockOpr.LockType;
+import org.apache.hadoop.ozone.om.request.util.ObjectParser;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+
+/**
+ * Map request lock for lock and unlock.
+ */
+public final class OmRequestLockUtils {
+ private static Map<OzoneManagerProtocolProtos.Type,
+ BiFunction<OzoneManager, OzoneManagerProtocolProtos.OMRequest,
OmLockOpr>> reqLockMap = new HashMap<>();
+ private static final OmLockOpr NO_LOCK = new OmLockOpr();
+ private static final OmLockOpr EXP_LOCK = new OmLockOpr();
+
+ private OmRequestLockUtils() {
+ }
+
+ public static void init() {
+ reqLockMap.put(Type.CreateVolume, (om, req) ->
+ new OmLockOpr(LockType.W_VOLUME,
req.getCreateVolumeRequest().getVolumeInfo().getVolume()));
+ reqLockMap.put(Type.SetVolumeProperty, (om, req) ->
+ new OmLockOpr(LockType.R_VOLUME,
req.getSetVolumePropertyRequest().getVolumeName()));
+ reqLockMap.put(Type.DeleteVolume, (om, req) ->
+ new OmLockOpr(LockType.W_VOLUME,
req.getDeleteVolumeRequest().getVolumeName()));
+ reqLockMap.put(Type.CreateBucket, (om, req) -> new
OmLockOpr(LockType.RW_VOLUME_BUCKET,
+ req.getCreateBucketRequest().getBucketInfo().getVolumeName(),
+ req.getCreateBucketRequest().getBucketInfo().getBucketName()));
+ reqLockMap.put(Type.DeleteBucket, (om, req) ->
+ new OmLockOpr(LockType.W_BUCKET,
req.getDeleteBucketRequest().getBucketName()));
+ reqLockMap.put(Type.SetBucketProperty, (om, req) ->
+ new OmLockOpr(LockType.W_BUCKET,
req.getSetBucketPropertyRequest().getBucketArgs().getBucketName()));
+ reqLockMap.put(Type.AddAcl, (om, req) -> aclLockInfo(om,
req.getAddAclRequest().getObj()));
+ reqLockMap.put(Type.RemoveAcl, (om, req) -> aclLockInfo(om,
req.getAddAclRequest().getObj()));
+ reqLockMap.put(Type.SetAcl, (om, req) -> aclLockInfo(om,
req.getAddAclRequest().getObj()));
+ reqLockMap.put(Type.GetS3Secret, (om, req) -> new OmLockOpr(LockType.WRITE,
+ req.getGetS3SecretRequest().getKerberosID()));
+ reqLockMap.put(Type.SetS3Secret, (om, req) -> new OmLockOpr(LockType.WRITE,
+ req.getSetS3SecretRequest().getAccessId()));
+ reqLockMap.put(Type.RevokeS3Secret, (om, req) -> new
OmLockOpr(LockType.WRITE,
+ req.getRevokeS3SecretRequest().getKerberosID()));
+ reqLockMap.put(Type.GetDelegationToken, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.CancelDelegationToken, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.RenewDelegationToken, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.FinalizeUpgrade, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.Prepare, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.CancelPrepare, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.SetRangerServiceVersion, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.EchoRPC, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.QuotaRepair, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.AbortExpiredMultiPartUploads, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.PurgeKeys, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.PurgeDirectories, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.SnapshotMoveDeletedKeys, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.SnapshotMoveTableKeys, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.SnapshotPurge, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.SetSnapshotProperty, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.DeleteOpenKeys, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.PersistDb, (om, req) -> NO_LOCK);
+ reqLockMap.put(Type.CreateTenant, (om, req) ->
+ new OmLockOpr(LockType.W_VOLUME,
req.getCreateTenantRequest().getVolumeName()));
+ reqLockMap.put(Type.DeleteTenant, (om, req) ->
+ getTenantLock(om, LockType.W_VOLUME,
req.getDeleteTenantRequest().getTenantId()));
+ reqLockMap.put(Type.TenantAssignUserAccessId, (om, req) ->
+ getTenantLock(om, LockType.R_VOLUME,
req.getTenantAssignUserAccessIdRequest().getTenantId()));
+ reqLockMap.put(Type.TenantRevokeUserAccessId, (om, req) ->
+ getTenantLock(om, LockType.R_VOLUME,
req.getTenantRevokeUserAccessIdRequest().getTenantId()));
+ reqLockMap.put(Type.TenantAssignAdmin, (om, req) ->
+ getTenantLock(om, LockType.R_VOLUME,
req.getTenantAssignAdminRequest().getTenantId()));
+ reqLockMap.put(Type.TenantRevokeAdmin, (om, req) ->
+ getTenantLock(om, LockType.R_VOLUME,
req.getTenantRevokeAdminRequest().getTenantId()));
+ reqLockMap.put(Type.CreateSnapshot, (om, req) -> new
OmLockOpr(LockType.SNAPSHOT,
+ req.getCreateSnapshotRequest().getVolumeName(),
req.getCreateSnapshotRequest().getBucketName(),
+ req.getCreateSnapshotRequest().getSnapshotName()));
+ reqLockMap.put(Type.DeleteSnapshot, (om, req) -> new
OmLockOpr(LockType.SNAPSHOT,
+ req.getDeleteSnapshotRequest().getVolumeName(),
req.getDeleteSnapshotRequest().getBucketName(),
+ req.getDeleteSnapshotRequest().getSnapshotName()));
+ reqLockMap.put(Type.RenameSnapshot, (om, req) -> new
OmLockOpr(LockType.SNAPSHOT,
+ req.getRenameSnapshotRequest().getVolumeName(),
req.getRenameSnapshotRequest().getBucketName(),
+ Arrays.asList(req.getRenameSnapshotRequest().getSnapshotOldName(),
+ req.getRenameSnapshotRequest().getSnapshotNewName())));
+ reqLockMap.put(Type.RecoverLease, (om, req) -> new
OmLockOpr(LockType.W_FSO,
+ req.getRecoverLeaseRequest().getVolumeName(),
req.getRecoverLeaseRequest().getBucketName(),
+ req.getRecoverLeaseRequest().getKeyName()));
+ reqLockMap.put(Type.CreateDirectory, (om, req) -> getKeyLock(om,
req.getCreateDirectoryRequest().getKeyArgs()));
+ reqLockMap.put(Type.CreateFile, (om, req) -> getKeyLock(om,
req.getCreateFileRequest().getKeyArgs()));
+ reqLockMap.put(Type.CreateKey, (om, req) -> getKeyLock(om,
req.getCreateKeyRequest().getKeyArgs()));
+ reqLockMap.put(Type.AllocateBlock, (om, req) -> getKeyLock(om,
req.getAllocateBlockRequest().getKeyArgs()));
+ reqLockMap.put(Type.CommitKey, (om, req) -> getKeyLock(om,
req.getCommitKeyRequest().getKeyArgs()));
+ reqLockMap.put(Type.DeleteKey, (om, req) -> getKeyLock(om,
req.getDeleteKeyRequest().getKeyArgs()));
+ reqLockMap.put(Type.DeleteKeys, (om, req) -> getDeleteKeyLock(om,
req.getDeleteKeysRequest().getDeleteKeys()));
+ reqLockMap.put(Type.RenameKey, (om, req) -> getRenameKey(om,
req.getRenameKeyRequest()));
+ reqLockMap.put(Type.RenameKeys, (om, req) -> {
+ OzoneManagerProtocolProtos.RenameKeysArgs renameKeysArgs =
req.getRenameKeysRequest().getRenameKeysArgs();
+ List<String> lockKeyList = new ArrayList<>();
+ renameKeysArgs.getRenameKeysMapList().forEach(e -> {
+ lockKeyList.add(e.getFromKeyName());
+ lockKeyList.add(e.getToKeyName());
+ });
+ return new OmLockOpr(LockType.W_OBS, renameKeysArgs.getVolumeName(),
renameKeysArgs.getBucketName(), lockKeyList);
+ });
+ reqLockMap.put(Type.InitiateMultiPartUpload, (om, req) ->
+ getKeyLock(om, req.getInitiateMultiPartUploadRequest().getKeyArgs()));
+ reqLockMap.put(Type.CommitMultiPartUpload, (om, req) ->
+ getKeyLock(om, req.getCommitMultiPartUploadRequest().getKeyArgs()));
+ reqLockMap.put(Type.AbortMultiPartUpload, (om, req) ->
+ getKeyLock(om, req.getAbortMultiPartUploadRequest().getKeyArgs()));
+ reqLockMap.put(Type.CompleteMultiPartUpload, (om, req) ->
+ getKeyLock(om, req.getCompleteMultiPartUploadRequest().getKeyArgs()));
+ reqLockMap.put(Type.SetTimes, (om, req) -> getKeyLock(om,
req.getSetTimesRequest().getKeyArgs()));
+ }
+
+ private static OmLockOpr aclLockInfo(OzoneManager om,
OzoneManagerProtocolProtos.OzoneObj obj) {
+ try {
+ if (obj.getResType() ==
OzoneManagerProtocolProtos.OzoneObj.ObjectType.VOLUME) {
+ return new OmLockOpr(LockType.R_VOLUME, obj.getPath().substring(1));
+ } else if (obj.getResType() ==
OzoneManagerProtocolProtos.OzoneObj.ObjectType.BUCKET) {
+ ObjectParser objParser = new ObjectParser(obj.getPath(),
OzoneManagerProtocolProtos.OzoneObj.ObjectType.BUCKET);
+ return new OmLockOpr(LockType.RW_VOLUME_BUCKET, objParser.getVolume(),
objParser.getBucket());
+ } else if (obj.getResType() ==
OzoneManagerProtocolProtos.OzoneObj.ObjectType.KEY) {
+ ObjectParser objParser = new ObjectParser(obj.getPath(),
OzoneManagerProtocolProtos.OzoneObj.ObjectType.KEY);
+ String bucketKey =
om.getMetadataManager().getBucketKey(objParser.getVolume(),
objParser.getBucket());
+ OmBucketInfo bucketInfo =
om.getMetadataManager().getBucketTable().get(bucketKey);
+ if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+ return new OmLockOpr(LockType.W_FSO, objParser.getVolume(),
objParser.getBucket(), objParser.getKey());
+ }
+ if
(bucketInfo.getBucketLayout().isObjectStore(om.getEnableFileSystemPaths())) {
+ return new OmLockOpr(LockType.W_OBS, objParser.getVolume(),
objParser.getBucket(), objParser.getKey());
+ }
+ // OBS and LegacyFso will follow same path for lock with full key path
+ return new OmLockOpr(LockType.W_OBS, objParser.getVolume(),
objParser.getBucket(), objParser.getKey());
+ } else {
+ // prefix type with full path
+ return new OmLockOpr(LockType.WRITE, obj.getPath());
+ }
+ } catch (Exception ex) {
+ return EXP_LOCK;
+ }
+ }
+
+ private static OmLockOpr getTenantLock(OzoneManager om, LockType type,
String tenantId) {
+ try {
+ OmDBTenantState omDBTenantState =
om.getMetadataManager().getTenantStateTable().get(tenantId);
+ return new OmLockOpr(type, omDBTenantState.getBucketNamespaceName());
+ } catch (Exception ex) {
+ return EXP_LOCK;
+ }
+ }
+
+ private static OmLockOpr getKeyLock(OzoneManager om,
OzoneManagerProtocolProtos.KeyArgs args) {
+ try {
+ String bucketKey =
om.getMetadataManager().getBucketKey(args.getVolumeName(),
args.getBucketName());
+ OmBucketInfo bucketInfo =
om.getMetadataManager().getBucketTable().get(bucketKey);
+ if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+ return new OmLockOpr(LockType.W_FSO, args.getVolumeName(),
args.getBucketName(), args.getKeyName());
+ }
+ if
(bucketInfo.getBucketLayout().isObjectStore(om.getEnableFileSystemPaths())) {
+ return new OmLockOpr(LockType.W_OBS, args.getVolumeName(),
args.getBucketName(), args.getKeyName());
+ }
+ // as LegacyFSO, need extract all path to be created and do write lock
for missing parent
+ return new OmLockOpr(LockType.W_LEGACY_FSO, args.getVolumeName(),
args.getBucketName(), args.getKeyName());
+ } catch (Exception ex) {
+ return EXP_LOCK;
+ }
+ }
+ private static OmLockOpr getDeleteKeyLock(OzoneManager om,
OzoneManagerProtocolProtos.DeleteKeyArgs args) {
+ String bucketKey =
om.getMetadataManager().getBucketKey(args.getVolumeName(),
args.getBucketName());
+ try {
+ OmBucketInfo bucketInfo =
om.getMetadataManager().getBucketTable().get(bucketKey);
+ if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+ return new OmLockOpr(LockType.W_FSO, args.getVolumeName(),
args.getBucketName(), args.getKeysList());
+ }
+ return new OmLockOpr(LockType.W_OBS, args.getVolumeName(),
args.getBucketName(), args.getKeysList());
+ } catch (IOException e) {
+ return EXP_LOCK;
+ }
+ }
+ private static OmLockOpr getRenameKey(OzoneManager om,
OzoneManagerProtocolProtos.RenameKeyRequest req) {
+ String bucketKey =
om.getMetadataManager().getBucketKey(req.getKeyArgs().getVolumeName(),
+ req.getKeyArgs().getBucketName());
+ try {
+ OmBucketInfo bucketInfo =
om.getMetadataManager().getBucketTable().get(bucketKey);
+ if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
+ return new OmLockOpr(LockType.W_FSO, req.getKeyArgs().getVolumeName(),
req.getKeyArgs().getBucketName(),
+ Arrays.asList(req.getKeyArgs().getKeyName(), req.getToKeyName()));
+ }
+ return new OmLockOpr(LockType.W_OBS, req.getKeyArgs().getVolumeName(),
req.getKeyArgs().getBucketName(),
+ Arrays.asList(req.getKeyArgs().getKeyName(), req.getToKeyName()));
+ } catch (IOException e) {
+ return EXP_LOCK;
+ }
+ }
+
+ public static OmLockOpr getLockOperation(OzoneManager om,
OzoneManagerProtocolProtos.OMRequest req) {
+ return reqLockMap.get(req.getCmdType()).apply(om, req);
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java
new file mode 100644
index 0000000000..8a31e22690
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/BucketQuotaResource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * track changes for bucket quota.
+ */
+public class BucketQuotaResource {
+ private static BucketQuotaResource quotaResource = new BucketQuotaResource();
+ public static BucketQuotaResource instance() {
+ return quotaResource;
+ }
+
+ private AtomicBoolean track = new AtomicBoolean();
+ private Map<Long, BucketQuota> bucketQuotaMap = new ConcurrentHashMap<>();
+
+ public void enableTrack() {
+ track.set(true);
+ }
+ public boolean isEnabledTrack() {
+ return track.get();
+ }
+
+ public BucketQuota get(Long bucketId) {
+ return bucketQuotaMap.computeIfAbsent(bucketId, k -> new BucketQuota());
+ }
+
+ public void remove(Long bucketId) {
+ bucketQuotaMap.remove(bucketId);
+ }
+
+ public boolean exist(Long bucketId) {
+ return bucketQuotaMap.containsKey(bucketId);
+ }
+
+ /**
+ * record bucket changes.
+ */
+ public static class BucketQuota {
+ private AtomicLong incUsedBytes = new AtomicLong();
+ private AtomicLong incUsedNamespace = new AtomicLong();
+
+ public void addUsedBytes(long bytes) {
+ incUsedBytes.addAndGet(bytes);
+ }
+
+ public long getUsedBytes() {
+ return incUsedBytes.get();
+ }
+
+ public void addUsedNamespace(long bytes) {
+ incUsedNamespace.addAndGet(bytes);
+ }
+
+ public long getUsedNamespace() {
+ return incUsedNamespace.get();
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
index bfe51bf4bb..199485d1bd 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
@@ -38,7 +38,7 @@ public class FollowerRequestExecutor {
private final AtomicLong callId = new AtomicLong(0);
private final OzoneManager ozoneManager;
private AtomicLong uniqueIndex;
- private final PoolExecutor<RequestContext> ratisSubmitter;
+ private final PoolExecutor<RequestContext, Void> ratisSubmitter;
private final OzoneManagerRequestHandler handler;
public FollowerRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
@@ -50,7 +50,7 @@ public class FollowerRequestExecutor {
this.handler = null;
}
ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE,
RATIS_TASK_QUEUE_SIZE,
- ozoneManager.getThreadNamePrefix(), this::ratisSubmitCommand, null);
+ ozoneManager.getThreadNamePrefix() + "-FollowerExecutor",
this::ratisSubmitCommand, null);
}
public void stop() {
ratisSubmitter.stop();
@@ -63,7 +63,7 @@ public class FollowerRequestExecutor {
ratisSubmitter.submit(idx, ctx);
}
- private void ratisSubmitCommand(Collection<RequestContext> ctxs,
PoolExecutor<RequestContext> nxtPool) {
+ private void ratisSubmitCommand(Collection<RequestContext> ctxs,
PoolExecutor.CheckedConsumer<Void> nxtPool) {
for (RequestContext ctx : ctxs) {
sendDbUpdateRequest(ctx);
}
@@ -71,12 +71,6 @@ public class FollowerRequestExecutor {
private void sendDbUpdateRequest(RequestContext ctx) {
try {
- if (!ozoneManager.isRatisEnabled()) {
- OzoneManagerProtocolProtos.OMResponse response =
OMBasicStateMachine.runCommand(ctx.getRequest(),
- TermIndex.valueOf(-1, uniqueIndex.incrementAndGet()), handler,
ozoneManager);
- ctx.getFuture().complete(response);
- return;
- }
// TODO hack way of transferring Leader index to follower nodes to use
this index
// need check proper way of index
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
@@ -84,6 +78,13 @@ public class FollowerRequestExecutor {
reqBuilder.addIndex(uniqueIndex.incrementAndGet());
OzoneManagerProtocolProtos.OMRequest req = ctx.getRequest().toBuilder()
.setPersistDbRequest(reqBuilder.build()).build();
+
+ if (!ozoneManager.isRatisEnabled()) {
+ OzoneManagerProtocolProtos.OMResponse response =
OMBasicStateMachine.runCommand(req,
+ TermIndex.valueOf(-1, uniqueIndex.incrementAndGet()), handler,
ozoneManager);
+ ctx.getFuture().complete(response);
+ return;
+ }
OzoneManagerProtocolProtos.OMResponse response =
ozoneManager.getOmRatisServer().submitRequest(req,
ClientId.randomId(), callId.incrementAndGet());
ctx.getFuture().complete(response);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
index c074918a1f..f213c0a514 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
@@ -26,11 +26,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -54,24 +56,30 @@ public class LeaderRequestExecutor {
private static final Logger LOG =
LoggerFactory.getLogger(LeaderRequestExecutor.class);
private static final int REQUEST_EXECUTOR_POOL_SIZE = 1;
private static final int REQUEST_EXECUTOR_QUEUE_SIZE = 1000;
- private static final int RATIS_TASK_POOL_SIZE = 1;
+ private static final int MERGE_TASK_POOL_SIZE = 1;
+ private static final int MERGE_TASK_QUEUE_SIZE = 1000;
+ private static final int RATIS_TASK_POOL_SIZE = 10;
private static final int RATIS_TASK_QUEUE_SIZE = 1000;
private static final long DUMMY_TERM = -1;
private final AtomicLong uniqueIndex;
private final int ratisByteLimit;
private final OzoneManager ozoneManager;
- private final PoolExecutor<RequestContext> ratisSubmitter;
- private final PoolExecutor<RequestContext> leaderExecutor;
+ private final PoolExecutor<RatisContext, Void> ratisSubmitter;
+ private final PoolExecutor<RequestContext, RatisContext> requestMerger;
+ private final PoolExecutor<RequestContext, RequestContext> leaderExecutor;
private final OzoneManagerRequestHandler handler;
private final AtomicBoolean isEnabled = new AtomicBoolean(true);
+ private final AtomicInteger ratisCurrentPool = new AtomicInteger(0);
public LeaderRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
this.ozoneManager = om;
this.handler = new OzoneManagerRequestHandler(ozoneManager);
- ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE,
- RATIS_TASK_QUEUE_SIZE, ozoneManager.getThreadNamePrefix(),
this::ratisSubmitCommand, null);
+ ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE,
RATIS_TASK_QUEUE_SIZE,
+ ozoneManager.getThreadNamePrefix() + "-LeaderRatis",
this::ratisCommand, null);
+ requestMerger = new PoolExecutor<>(MERGE_TASK_POOL_SIZE,
MERGE_TASK_QUEUE_SIZE,
+ ozoneManager.getThreadNamePrefix() + "-LeaderMerger",
this::requestMergeCommand, this::ratisSubmit);
leaderExecutor = new PoolExecutor<>(REQUEST_EXECUTOR_POOL_SIZE,
REQUEST_EXECUTOR_QUEUE_SIZE,
- ozoneManager.getThreadNamePrefix(), this::runExecuteCommand,
ratisSubmitter);
+ ozoneManager.getThreadNamePrefix() + "-LeaderExecutor",
this::runExecuteCommand, this::mergeSubmit);
int limit = (int) ozoneManager.getConfiguration().getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
@@ -82,6 +90,7 @@ public class LeaderRequestExecutor {
}
public void stop() {
leaderExecutor.stop();
+ requestMerger.stop();
ratisSubmitter.stop();
}
public int batchSize() {
@@ -99,39 +108,45 @@ public class LeaderRequestExecutor {
public void submit(int idx, RequestContext ctx) throws InterruptedException {
if (!isEnabled.get()) {
- rejectRequest(ctx);
+ rejectRequest(Collections.singletonList(ctx));
return;
}
- leaderExecutor.submit(idx, ctx);
+ executeRequest(ctx, this::mergeSubmit);
+ //leaderExecutor.submit(idx, ctx);
}
- private void rejectRequest(RequestContext ctx) {
+ private void rejectRequest(Collection<RequestContext> ctxs) {
+ Throwable th;
if (!ozoneManager.isLeaderReady()) {
String peerId = ozoneManager.isRatisEnabled() ?
ozoneManager.getOmRatisServer().getRaftPeerId().toString()
: ozoneManager.getOMNodeId();
- OMLeaderNotReadyException leaderNotReadyException = new
OMLeaderNotReadyException(peerId
- + " is not ready to process request yet.");
- ctx.getFuture().completeExceptionally(leaderNotReadyException);
+ th = new OMLeaderNotReadyException(peerId + " is not ready to process
request yet.");
} else {
- ctx.getFuture().completeExceptionally(new OMException("Request
processing is disabled due to error",
- OMException.ResultCodes.INTERNAL_ERROR));
+ th = new OMException("Request processing is disabled due to error",
OMException.ResultCodes.INTERNAL_ERROR);
}
- }
- private void rejectRequest(Collection<RequestContext> ctxs) {
- ctxs.forEach(ctx -> rejectRequest(ctx));
+ handleBatchUpdateComplete(ctxs, th, null);
}
- private void runExecuteCommand(Collection<RequestContext> ctxs,
PoolExecutor<RequestContext> nxtPool) {
+ private void runExecuteCommand(
+ Collection<RequestContext> ctxs,
PoolExecutor.CheckedConsumer<RequestContext> nxtPool) {
+ if (!isEnabled.get()) {
+ rejectRequest(ctxs);
+ return;
+ }
for (RequestContext ctx : ctxs) {
if (!isEnabled.get()) {
- rejectRequest(ctx);
- return;
+ rejectRequest(Collections.singletonList(ctx));
+ continue;
}
executeRequest(ctx, nxtPool);
}
}
- private void executeRequest(RequestContext ctx, PoolExecutor<RequestContext>
nxtPool) {
+ private void mergeSubmit(RequestContext ctx) throws InterruptedException {
+ requestMerger.submit(0, ctx);
+ }
+
+ private void executeRequest(RequestContext ctx,
PoolExecutor.CheckedConsumer<RequestContext> nxtPool) {
OMRequest request = ctx.getRequest();
TermIndex termIndex = TermIndex.valueOf(DUMMY_TERM,
uniqueIndex.incrementAndGet());
ctx.setIndex(termIndex);
@@ -146,7 +161,7 @@ public class LeaderRequestExecutor {
} finally {
if (ctx.getNextRequest() != null) {
try {
- nxtPool.submit(0, ctx);
+ nxtPool.accept(ctx);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -164,7 +179,8 @@ public class LeaderRequestExecutor {
if (!omClientResponse.getOMResponse().getSuccess()) {
OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex);
} else {
- OzoneManagerProtocolProtos.PersistDbRequest.Builder nxtRequest =
retrieveDbChanges(termIndex, omClientResponse);
+ OzoneManagerProtocolProtos.PersistDbRequest.Builder nxtRequest
+ = retrieveDbChanges(ctx, termIndex, omClientResponse);
if (nxtRequest != null) {
OMRequest.Builder omReqBuilder =
OMRequest.newBuilder().setPersistDbRequest(nxtRequest.build())
.setCmdType(OzoneManagerProtocolProtos.Type.PersistDb);
@@ -181,16 +197,25 @@ public class LeaderRequestExecutor {
}
private OzoneManagerProtocolProtos.PersistDbRequest.Builder
retrieveDbChanges(
- TermIndex termIndex, OMClientResponse omClientResponse) throws
IOException {
- try (BatchOperation batchOperation =
ozoneManager.getMetadataManager().getStore()
+ RequestContext ctx, TermIndex termIndex, OMClientResponse
omClientResponse) throws IOException {
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ String name = metadataManager.getBucketTable().getName();
+ boolean isDbChanged = false;
+ try (BatchOperation batchOperation = metadataManager.getStore()
.initBatchOperation()) {
- omClientResponse.checkAndUpdateDB(ozoneManager.getMetadataManager(),
batchOperation);
- // get db update and raise request to flush
+ omClientResponse.checkAndUpdateDB(metadataManager, batchOperation);
+ // get db update and create request to flush
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
= OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
Map<String, Map<ByteBuffer, ByteBuffer>> cachedDbTxs
= ((RDBBatchOperation) batchOperation).getCachedTransaction();
for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> tblEntry :
cachedDbTxs.entrySet()) {
+ isDbChanged = true;
+ if (tblEntry.getKey().equals(name)) {
+ if (ctx.getClientRequest().getWrappedBucketInfo() instanceof
OmBucketInfoQuotaTracker) {
+ continue;
+ }
+ }
OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder
= OzoneManagerProtocolProtos.DBTableUpdate.newBuilder();
tblBuilder.setTableName(tblEntry.getKey());
@@ -205,7 +230,7 @@ public class LeaderRequestExecutor {
}
reqBuilder.addTableUpdates(tblBuilder.build());
}
- if (reqBuilder.getTableUpdatesCount() == 0) {
+ if (!isDbChanged) {
return null;
}
reqBuilder.addIndex(termIndex.getIndex());
@@ -213,11 +238,13 @@ public class LeaderRequestExecutor {
}
}
- private void ratisSubmitCommand(Collection<RequestContext> ctxs,
PoolExecutor<RequestContext> nxtPool) {
+ private void requestMergeCommand(
+ Collection<RequestContext> ctxs,
PoolExecutor.CheckedConsumer<RatisContext> nxtPool) {
if (!isEnabled.get()) {
rejectRequest(ctxs);
return;
}
+ Map<Long, OzoneManagerProtocolProtos.BucketQuotaCount.Builder>
bucketChangeMap = new HashMap<>();
List<RequestContext> sendList = new ArrayList<>();
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
= OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
@@ -230,16 +257,19 @@ public class LeaderRequestExecutor {
}
if ((tmpSize + size) > ratisByteLimit) {
// send current batched request
- prepareAndSendRequest(sendList, reqBuilder);
+ appendBucketQuotaChanges(reqBuilder, bucketChangeMap);
+ prepareAndSendRequest(sendList, reqBuilder, nxtPool);
// reinit and continue
reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
size = 0;
sendList.clear();
+ bucketChangeMap.clear();
}
// keep adding to batch list
size += tmpSize;
+ addBucketQuotaChanges(ctx, bucketChangeMap);
for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) {
OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder
= OzoneManagerProtocolProtos.DBTableUpdate.newBuilder();
@@ -251,30 +281,81 @@ public class LeaderRequestExecutor {
sendList.add(ctx);
}
if (sendList.size() > 0) {
- prepareAndSendRequest(sendList, reqBuilder);
+ appendBucketQuotaChanges(reqBuilder, bucketChangeMap);
+ prepareAndSendRequest(sendList, reqBuilder, nxtPool);
+ }
+ }
+
+ private void ratisSubmit(RatisContext ctx) throws InterruptedException {
+ // follow simple strategy to submit to ratis for next set of merge request
+ int nxtIndex = Math.abs(ratisCurrentPool.getAndIncrement() %
RATIS_TASK_POOL_SIZE);
+ ratisSubmitter.submit(nxtIndex, ctx);
+ }
+
+ private void addBucketQuotaChanges(
+ RequestContext ctx, Map<Long,
OzoneManagerProtocolProtos.BucketQuotaCount.Builder> quotaMap) {
+ if (ctx.getClientRequest().getWrappedBucketInfo() instanceof
OmBucketInfoQuotaTracker) {
+ OmBucketInfoQuotaTracker info = (OmBucketInfoQuotaTracker)
ctx.getClientRequest().getWrappedBucketInfo();
+ OzoneManagerProtocolProtos.BucketQuotaCount.Builder quotaBuilder =
quotaMap.computeIfAbsent(
+ info.getObjectID(), k ->
OzoneManagerProtocolProtos.BucketQuotaCount.newBuilder()
+
.setVolName(info.getVolumeName()).setBucketName(info.getBucketName())
+
.setBucketObjectId(info.getObjectID()).setSupportOldQuota(false));
+ quotaBuilder.setDiffUsedBytes(quotaBuilder.getDiffUsedBytes() +
info.getIncUsedBytes());
+ quotaBuilder.setDiffUsedNamespace(quotaBuilder.getDiffUsedNamespace() +
info.getIncUsedNamespace());
+ }
+ }
+
+ private void appendBucketQuotaChanges(
+ OzoneManagerProtocolProtos.PersistDbRequest.Builder req,
+ Map<Long, OzoneManagerProtocolProtos.BucketQuotaCount.Builder> quotaMap)
{
+ for (Map.Entry<Long, OzoneManagerProtocolProtos.BucketQuotaCount.Builder>
entry : quotaMap.entrySet()) {
+ if (entry.getValue().getDiffUsedBytes() == 0 &&
entry.getValue().getDiffUsedNamespace() == 0) {
+ continue;
+ }
+ req.addBucketQuotaCount(entry.getValue().build());
}
}
private void prepareAndSendRequest(
- List<RequestContext> sendList,
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder) {
+ List<RequestContext> sendList,
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder,
+ PoolExecutor.CheckedConsumer<RatisContext> nxtPool) {
RequestContext lastReqCtx = sendList.get(sendList.size() - 1);
OMRequest.Builder omReqBuilder =
OMRequest.newBuilder().setPersistDbRequest(reqBuilder.build())
.setCmdType(OzoneManagerProtocolProtos.Type.PersistDb)
.setClientId(lastReqCtx.getRequest().getClientId());
+ OMRequest reqBatch = omReqBuilder.build();
try {
- OMRequest reqBatch = omReqBuilder.build();
- OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch,
lastReqCtx.getIndex());
- if (!dbUpdateRsp.getSuccess()) {
- throw new OMException(dbUpdateRsp.getMessage(),
-
OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]);
- }
- handleBatchUpdateComplete(sendList, null,
dbUpdateRsp.getLeaderOMNodeId());
- } catch (Throwable e) {
- LOG.warn("Failed to write, Exception occurred ", e);
+ nxtPool.accept(new RatisContext(sendList, reqBatch));
+ } catch (InterruptedException e) {
handleBatchUpdateComplete(sendList, e, null);
+ Thread.currentThread().interrupt();
}
}
+ private void ratisCommand(Collection<RatisContext> ctxs,
PoolExecutor.CheckedConsumer<Void> nxtPool) {
+ if (!isEnabled.get()) {
+ for (RatisContext ctx : ctxs) {
+ rejectRequest(ctx.getRequestContexts());
+ }
+ return;
+ }
+ for (RatisContext ctx : ctxs) {
+ List<RequestContext> sendList = ctx.getRequestContexts();
+ RequestContext lastReqCtx = sendList.get(sendList.size() - 1);
+ OMRequest reqBatch = ctx.getRequest();
+ try {
+ OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch,
lastReqCtx.getIndex());
+ if (!dbUpdateRsp.getSuccess()) {
+ throw new OMException(dbUpdateRsp.getMessage(),
+
OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]);
+ }
+ handleBatchUpdateComplete(sendList, null,
dbUpdateRsp.getLeaderOMNodeId());
+ } catch (Throwable e) {
+ LOG.warn("Failed to write, Exception occurred ", e);
+ handleBatchUpdateComplete(sendList, e, null);
+ }
+ }
+ }
private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex
termIndex) throws Exception {
try {
if (!ozoneManager.isRatisEnabled()) {
@@ -300,8 +381,29 @@ public class LeaderRequestExecutor {
return omResponse;
}
private void handleBatchUpdateComplete(Collection<RequestContext> ctxs,
Throwable th, String leaderOMNodeId) {
+ // TODO: no-cache switch, no need cleanup cache
Map<String, List<Long>> cleanupMap = new HashMap<>();
for (RequestContext ctx : ctxs) {
+ // cache cleanup
+ if (null != ctx.getNextRequest()) {
+ List<OzoneManagerProtocolProtos.DBTableUpdate> tblList =
ctx.getNextRequest().getTableUpdatesList();
+ for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) {
+ List<Long> epochs =
cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>());
+ epochs.add(ctx.getIndex().getIndex());
+ }
+ }
+ for (Map.Entry<String, List<Long>> entry : cleanupMap.entrySet()) {
+
ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue());
+ }
+ }
+
+ for (RequestContext ctx : ctxs) {
+ if (ctx.getClientRequest().getWrappedBucketInfo() instanceof
OmBucketInfoQuotaTracker) {
+ // reset to be done to update resource quota for both success and
failure
+ // for success also, its added here as it's difficult to ensure its
execution at nodes
+ ((OmBucketInfoQuotaTracker)
ctx.getClientRequest().getWrappedBucketInfo()).reset();
+ }
+
if (th != null) {
OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(),
ctx.getClientRequest(), ozoneManager,
ctx.getIndex(), th);
@@ -310,9 +412,6 @@ public class LeaderRequestExecutor {
} else {
ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new
IOException(th)));
}
-
- // TODO: no-cache, remove disable processing, let every request deal
with ratis failure
- disableProcessing();
} else {
OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(),
ctx.getIndex());
OMResponse newRsp = ctx.getResponse();
@@ -321,19 +420,22 @@ public class LeaderRequestExecutor {
}
ctx.getFuture().complete(newRsp);
}
+ }
+ }
- // cache cleanup
- if (null != ctx.getNextRequest()) {
- List<OzoneManagerProtocolProtos.DBTableUpdate> tblList =
ctx.getNextRequest().getTableUpdatesList();
- for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) {
- List<Long> epochs =
cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>());
- epochs.add(ctx.getIndex().getIndex());
- }
- }
+ static class RatisContext {
+ private List<RequestContext> ctxs;
+ private OMRequest req;
+ RatisContext(List<RequestContext> ctxs, OMRequest req) {
+ this.ctxs = ctxs;
+ this.req = req;
+ }
+ public List<RequestContext> getRequestContexts() {
+ return ctxs;
}
- // TODO: no-cache, no need cleanup cache
- for (Map.Entry<String, List<Long>> entry : cleanupMap.entrySet()) {
-
ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue());
+
+ public OMRequest getRequest() {
+ return req;
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
index fd2962ac5b..6b9cb06836 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
@@ -154,7 +154,10 @@ public class OMBasicStateMachine extends BaseStateMachine {
@Override
protected synchronized boolean updateLastAppliedTermIndex(TermIndex
newTermIndex) {
- return super.updateLastAppliedTermIndex(newTermIndex);
+ if (getLastAppliedTermIndex().getIndex() < newTermIndex.getIndex()) {
+ return super.updateLastAppliedTermIndex(newTermIndex);
+ }
+ return true;
}
/**
@@ -239,6 +242,14 @@ public class OMBasicStateMachine extends BaseStateMachine {
final OMRequest request = context != null ? (OMRequest) context
: OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
+ // Prepare have special handling for execution, as snapshot is taken
during this operation
+ // and its required that applyTransaction should be finished for this to
handle
+ // And from leader, its guranteed that no other execution is allowed
+ if (request.getCmdType() == OzoneManagerProtocolProtos.Type.Prepare) {
+ return CompletableFuture.supplyAsync(() ->
+ OMRatisHelper.convertResponseToMessage(runCommand(request,
termIndex, handler, ozoneManager)),
+ installSnapshotExecutor);
+ }
OMResponse response = runCommand(request, termIndex, handler,
ozoneManager);
CompletableFuture<Message> future = new CompletableFuture<>();
future.complete(OMRatisHelper.convertResponseToMessage(response));
@@ -377,11 +388,12 @@ public class OMBasicStateMachine extends BaseStateMachine
{
index = transactionInfo.getIndex();
}
try {
+ // leader index shared to follower for follower execution
if (request.hasPersistDbRequest() &&
request.getPersistDbRequest().getIndexCount() > 0) {
index =
Math.max(Collections.max(request.getPersistDbRequest().getIndexList()).longValue(),
index);
}
+ // TODO temp way to pass index used for object ID creation for certain
follower execution
TermIndex objectIndex = termIndex;
- // TODO temp fix for index sharing from leader to follower in follower
execution
if (request.getCmdType() != OzoneManagerProtocolProtos.Type.PersistDb
&& request.getCmdType() !=
OzoneManagerProtocolProtos.Type.Prepare) {
objectIndex = TermIndex.valueOf(termIndex.getTerm(), index);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
index 8942d04439..c2c9333f6d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
@@ -18,26 +18,20 @@ package org.apache.hadoop.ozone.om.ratis.execution;
import com.google.protobuf.ServiceException;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.TypedTable;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProcessingDetails;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.lock.OmLockOpr;
+import org.apache.hadoop.ozone.om.lock.OmRequestLockUtils;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -46,7 +40,6 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +52,7 @@ public class OMGateway {
private final FollowerRequestExecutor followerExecutor;
private final OzoneManager om;
private final AtomicLong requestInProgress = new AtomicLong(0);
+ private final AtomicInteger leaderExecCurrIdx = new AtomicInteger(0);
/**
* uniqueIndex is used to generate index used in objectId creation uniquely
accross OM nodes.
* This makes use of termIndex for init shifted within 54 bits.
@@ -67,6 +61,8 @@ public class OMGateway {
public OMGateway(OzoneManager om) throws IOException {
this.om = om;
+ OmLockOpr.init(om.getThreadNamePrefix());
+ OmRequestLockUtils.init();
this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex);
this.followerExecutor = new FollowerRequestExecutor(om, uniqueIndex);
if (om.isLeaderExecutorEnabled() && om.isRatisEnabled()) {
@@ -84,17 +80,22 @@ public class OMGateway {
// for non-ratis flow, init with last index
uniqueIndex.set(om.getLastTrxnIndexForNonRatis());
}
+ if (om.isLeaderExecutorEnabled()) {
+ BucketQuotaResource.instance().enableTrack();
+ }
}
public void stop() {
leaderExecutor.stop();
followerExecutor.stop();
+ OmLockOpr.stop();
}
public OMResponse submit(OMRequest omRequest) throws ServiceException {
if (!om.isLeaderReady()) {
- String peerId = om.isRatisEnabled() ?
om.getOmRatisServer().getRaftPeerId().toString() : om.getOMNodeId();
- OMLeaderNotReadyException leaderNotReadyException = new
OMLeaderNotReadyException(peerId
- + " is not ready to process request yet.");
- throw new ServiceException(leaderNotReadyException);
+ try {
+ om.checkLeaderStatus();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
executorEnable();
RequestContext requestContext = new RequestContext();
@@ -103,40 +104,73 @@ public class OMGateway {
requestContext.setFuture(new CompletableFuture<>());
CompletableFuture<OMResponse> f = requestContext.getFuture()
.whenComplete((r, th) -> handleAfterExecution(requestContext, th));
+ OmLockOpr lockOperation = OmRequestLockUtils.getLockOperation(om,
omRequest);
try {
- // TODO gateway locking: take lock with OMLockDetails
// TODO scheduling of request to pool
- om.checkLeaderStatus();
- validate(omRequest);
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, om);
+ lockOperation.lock(om);
requestContext.setClientRequest(omClientRequest);
+ validate(omRequest);
+ ensurePreviousRequestCompletionForPrepare(omRequest);
+
// submit request
ExecutorType executorType = executorSelector(omRequest);
if (executorType == ExecutorType.LEADER_COMPATIBLE) {
- leaderExecutor.submit(0, requestContext);
+ int idx = Math.abs(leaderExecCurrIdx.getAndIncrement() %
leaderExecutor.batchSize());
+ leaderExecutor.submit(idx, requestContext);
} else if (executorType == ExecutorType.FOLLOWER) {
followerExecutor.submit(0, requestContext);
} else {
leaderExecutor.submit(0, requestContext);
}
+
+ try {
+ return f.get();
+ } catch (ExecutionException ex) {
+ if (ex.getCause() != null) {
+ throw new ServiceException(ex.getMessage(), ex.getCause());
+ } else {
+ throw new ServiceException(ex.getMessage(), ex);
+ }
+ }
} catch (InterruptedException e) {
- requestContext.getFuture().completeExceptionally(e);
+ LOG.error("Interrupted while handling request", e);
Thread.currentThread().interrupt();
+ throw new ServiceException(e.getMessage(), e);
+ } catch (ServiceException e) {
+ throw e;
} catch (Throwable e) {
- requestContext.getFuture().completeExceptionally(e);
+ LOG.error("Exception occurred while handling request", e);
+ throw new ServiceException(e.getMessage(), e);
+ } finally {
+ lockOperation.unlock();
+ Server.Call call = Server.getCurCall().get();
+ if (null != call) {
+ OMLockDetails lockDetails = lockOperation.getLockDetails();
+ call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKWAIT,
+ lockDetails.getWaitLockNanos(), TimeUnit.NANOSECONDS);
+ call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKSHARED,
+ lockDetails.getReadLockNanos(), TimeUnit.NANOSECONDS);
+ call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKEXCLUSIVE,
+ lockDetails.getWriteLockNanos(), TimeUnit.NANOSECONDS);
+ }
}
- try {
- return f.get();
- } catch (ExecutionException ex) {
- throw new ServiceException(ex.getMessage(), ex);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new ServiceException(ex.getMessage(), ex);
+ }
+
+ private void ensurePreviousRequestCompletionForPrepare(OMRequest omRequest)
throws InterruptedException {
+ // if a prepare request, other request will be discarded before calling
this
+ if (omRequest.getCmdType() == OzoneManagerProtocolProtos.Type.Prepare) {
+ for (int cnt = 0; cnt < 60 && requestInProgress.get() > 1; ++cnt) {
+ Thread.sleep(1000);
+ }
+ if (requestInProgress.get() > 1) {
+ LOG.warn("Still few requests {} are in progress, continuing with
prepare", (requestInProgress.get() - 1));
+ }
}
}
- private void validate(OMRequest omRequest) throws IOException {
+ private synchronized void validate(OMRequest omRequest) throws IOException {
OzoneManagerRequestHandler.requestParamValidation(omRequest);
// validate prepare state
OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
@@ -165,14 +199,12 @@ public class OMGateway {
}
}
private void handleAfterExecution(RequestContext ctx, Throwable th) {
- // TODO: gateway locking: release lock and OMLockDetails update
requestInProgress.decrementAndGet();
}
public void leaderChangeNotifier(String newLeaderId) {
boolean isLeader = om.getOMNodeId().equals(newLeaderId);
if (isLeader) {
- cleanupCache();
resetUniqueIndex();
} else {
leaderExecutor.disableProcessing();
@@ -194,77 +226,11 @@ public class OMGateway {
}
}
- private void rebuildBucketVolumeCache() throws IOException {
- LOG.info("Rebuild of bucket and volume cache");
- Table<String, OmBucketInfo> bucketTable =
om.getMetadataManager().getBucketTable();
- Set<String> cachedBucketKeySet = new HashSet<>();
- Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> cacheItr =
bucketTable.cacheIterator();
- while (cacheItr.hasNext()) {
- cachedBucketKeySet.add(cacheItr.next().getKey().getCacheKey());
- }
- try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>>
bucItr = bucketTable.iterator()) {
- while (bucItr.hasNext()) {
- Table.KeyValue<String, OmBucketInfo> next = bucItr.next();
- bucketTable.addCacheEntry(next.getKey(), next.getValue(), -1);
- cachedBucketKeySet.remove(next.getKey());
- }
- }
-
- // removing extra cache entry
- for (String key : cachedBucketKeySet) {
- bucketTable.addCacheEntry(key, -1);
- }
-
- Set<String> cachedVolumeKeySet = new HashSet<>();
- Table<String, OmVolumeArgs> volumeTable =
om.getMetadataManager().getVolumeTable();
- Iterator<Map.Entry<CacheKey<String>, CacheValue<OmVolumeArgs>>>
volCacheItr = volumeTable.cacheIterator();
- while (volCacheItr.hasNext()) {
- cachedVolumeKeySet.add(volCacheItr.next().getKey().getCacheKey());
- }
- try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>>
volItr = volumeTable.iterator()) {
- while (volItr.hasNext()) {
- Table.KeyValue<String, OmVolumeArgs> next = volItr.next();
- volumeTable.addCacheEntry(next.getKey(), next.getValue(), -1);
- cachedVolumeKeySet.remove(next.getKey());
- }
- }
-
- // removing extra cache entry
- for (String key : cachedVolumeKeySet) {
- volumeTable.addCacheEntry(key, -1);
- }
- }
-
- public void cleanupCache() {
- // TODO no-cache case, no need re-build bucket/volume cache and cleanup of
cache
- LOG.debug("clean all table cache and update bucket/volume with db");
- for (String tbl : om.getMetadataManager().listTableNames()) {
- Table table = om.getMetadataManager().getTable(tbl);
- if (table instanceof TypedTable) {
- ArrayList<Long> epochs = new ArrayList<>(((TypedTable<?, ?>)
table).getCache().getEpochEntries().keySet());
- if (!epochs.isEmpty()) {
- table.cleanupCache(epochs);
- }
- }
- }
- try {
- rebuildBucketVolumeCache();
- } catch (IOException e) {
- // retry once, else om down
- try {
- rebuildBucketVolumeCache();
- } catch (IOException ex) {
- String errorMessage = "OM unable to access rocksdb, terminating OM.
Error " + ex.getMessage();
- ExitUtils.terminate(1, errorMessage, ex, LOG);
- }
- }
- }
public void executorEnable() throws ServiceException {
if (leaderExecutor.isProcessing()) {
return;
}
if (requestInProgress.get() == 0) {
- cleanupCache();
leaderExecutor.enableProcessing();
} else {
LOG.warn("Executor is not enabled, previous request {} is still not
cleaned", requestInProgress.get());
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OmBucketInfoQuotaTracker.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OmBucketInfoQuotaTracker.java
new file mode 100644
index 0000000000..62d5e8098d
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OmBucketInfoQuotaTracker.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+
+/**
+ * A class that encapsulates Bucket Info.
+ */
+public final class OmBucketInfoQuotaTracker extends OmBucketInfo {
+ private BucketQuotaResource.BucketQuota resource;
+ private long incUsedBytes;
+ private long incUsedNamespace;
+
+ public static OmBucketInfoQuotaTracker convert(OmBucketInfo info) {
+ Builder builder = info.toBuilder();
+ return createObject(info, builder);
+ }
+
+ private OmBucketInfoQuotaTracker(Builder b) {
+ super(b);
+ resource = BucketQuotaResource.instance().get(b.getObjectID());
+ }
+
+ @Override
+ public void incrUsedBytes(long bytes) {
+ incUsedBytes += bytes;
+ resource.addUsedBytes(bytes);
+ }
+
+ @Override
+ public void incrUsedNamespace(long namespaceToUse) {
+ incUsedNamespace += namespaceToUse;
+ resource.addUsedNamespace(namespaceToUse);
+ }
+
+ @Override
+ public long getUsedBytes() {
+ return resource.getUsedBytes() + super.getUsedBytes();
+ }
+
+ @Override
+ public long getUsedNamespace() {
+ return resource.getUsedNamespace() + super.getUsedNamespace();
+ }
+
+ public long getIncUsedBytes() {
+ return incUsedBytes;
+ }
+
+ public long getIncUsedNamespace() {
+ return incUsedNamespace;
+ }
+
+ /**
+ * reset resource used bytes as bucket info in db and cache is updated on
flush.
+ */
+ public void reset() {
+ resource.addUsedBytes(-incUsedBytes);
+ resource.addUsedNamespace(-incUsedNamespace);
+ }
+
+ @Override
+ public OmBucketInfoQuotaTracker copyObject() {
+ Builder builder = toBuilder();
+ return createObject(this, builder);
+ }
+
+ private static OmBucketInfoQuotaTracker createObject(OmBucketInfo info,
Builder builder) {
+ if (info.getEncryptionKeyInfo() != null) {
+ builder.setBucketEncryptionKey(info.getEncryptionKeyInfo().copy());
+ }
+
+ if (info.getDefaultReplicationConfig() != null) {
+
builder.setDefaultReplicationConfig(info.getDefaultReplicationConfig().copy());
+ }
+
+ Preconditions.checkNotNull(info.getVolumeName());
+ Preconditions.checkNotNull(info.getBucketName());
+ Preconditions.checkNotNull(info.getAcls());
+ Preconditions.checkNotNull(info.getStorageType());
+ return new OmBucketInfoQuotaTracker(builder);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
index fe9ae728fa..45a461b849 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
@@ -28,29 +28,29 @@ import java.util.function.BiConsumer;
/**
* Pool executor.
*/
-public class PoolExecutor <T> {
+public class PoolExecutor <T, Q> {
private Thread[] threadPool;
private List<BlockingQueue<T>> queues;
- private BiConsumer<Collection<T>, PoolExecutor<T>> handler = null;
- private PoolExecutor<T> nxtPool;
+ private BiConsumer<Collection<T>, CheckedConsumer<Q>> handler = null;
+ private CheckedConsumer<Q> submitter;
private AtomicBoolean isRunning = new AtomicBoolean(true);
private PoolExecutor(int poolSize, int queueSize, String threadPrefix) {
threadPool = new Thread[poolSize];
queues = new ArrayList<>(poolSize);
for (int i = 0; i < poolSize; ++i) {
- LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(1000);
+ LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(queueSize);
queues.add(queue);
- threadPool[i] = new Thread(() -> execute(queue), threadPrefix +
"OMExecutor-" + i);
+ threadPool[i] = new Thread(() -> execute(queue), threadPrefix + "-" + i);
threadPool[i].start();
}
}
public PoolExecutor(
- int poolSize, int queueSize, String threadPrefix,
BiConsumer<Collection<T>, PoolExecutor<T>> handler,
- PoolExecutor<T> nxtPool) {
+ int poolSize, int queueSize, String threadPrefix,
BiConsumer<Collection<T>, CheckedConsumer<Q>> handler,
+ CheckedConsumer<Q> submitter) {
this(poolSize, queueSize, threadPrefix);
this.handler = handler;
- this.nxtPool = nxtPool;
+ this.submitter = submitter;
}
public void submit(int idx, T task) throws InterruptedException {
if (idx < 0 || idx >= threadPool.length) {
@@ -66,7 +66,7 @@ public class PoolExecutor <T> {
T task = q.take();
entries.add(task);
q.drainTo(entries);
- handler.accept(entries, nxtPool);
+ handler.accept(entries, submitter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
@@ -84,4 +84,12 @@ public class PoolExecutor <T> {
}
}
}
+
+ /**
+ * checked functional interface.
+ */
+ @FunctionalInterface
+ public interface CheckedConsumer<S> {
+ void accept(S s) throws InterruptedException;
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 663f350718..c3dbed9e67 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -514,12 +514,11 @@ public final class OzoneManagerRatisUtils {
public static OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManager om, OMRequest omRequest, ClientId clientId, long callId)
throws ServiceException {
+ if (om.isLeaderExecutorEnabled()) {
+ return om.getOMGateway().submit(omRequest);
+ }
if (om.isRatisEnabled()) {
- if (om.isLeaderExecutorEnabled()) {
- return om.getOMGateway().submit(omRequest);
- } else {
- return om.getOmRatisServer().submitRequest(omRequest, clientId,
callId);
- }
+ return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
} else {
return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER,
omRequest);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 17f9663ae1..05e1066271 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ozone.OmUtils;
@@ -586,4 +587,7 @@ public abstract class OMClientRequest implements
RequestAuditor {
public void mergeOmLockDetails(OMLockDetails details) {
omLockDetails.merge(details);
}
+ public OmBucketInfo getWrappedBucketInfo() {
+ return null;
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
index 29e57ae916..697c86cf21 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.ozone.audit.OMSystemAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.execution.OmBucketInfoQuotaTracker;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -41,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
* Handle OMQuotaRepairRequest Request.
@@ -66,15 +70,25 @@ public class OMPersistDbRequest extends OMClientRequest {
@SuppressWarnings("methodlength")
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
TermIndex termIndex) {
OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ Table<String, OmBucketInfo> bucketTable = metadataManager.getBucketTable();
+ Table<String, OmVolumeArgs> volumeTable = metadataManager.getVolumeTable();
OzoneManagerProtocolProtos.PersistDbRequest dbUpdateRequest =
getOmRequest().getPersistDbRequest();
+ List<OzoneManagerProtocolProtos.DBTableUpdate> tableUpdatesList =
dbUpdateRequest.getTableUpdatesList();
- OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
try (BatchOperation batchOperation = metadataManager.getStore()
.initBatchOperation()) {
- List<OzoneManagerProtocolProtos.DBTableUpdate> tableUpdatesList =
dbUpdateRequest.getTableUpdatesList();
for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates :
tableUpdatesList) {
Table table = metadataManager.getTable(tblUpdates.getTableName());
List<OzoneManagerProtocolProtos.DBTableRecord> recordsList =
tblUpdates.getRecordsList();
+ if (bucketTable.getName().equals(tblUpdates.getTableName())) {
+ updateBucketRecord(bucketTable, batchOperation, recordsList);
+ continue;
+ }
+ if (volumeTable.getName().equals(tblUpdates.getTableName())) {
+ updateVolumeRecord(volumeTable, batchOperation, recordsList);
+ continue;
+ }
for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
if (record.hasValue()) {
// put
@@ -86,6 +100,28 @@ public class OMPersistDbRequest extends OMClientRequest {
}
}
}
+ for (OzoneManagerProtocolProtos.BucketQuotaCount quota :
dbUpdateRequest.getBucketQuotaCountList()) {
+ String bucketKey = metadataManager.getBucketKey(quota.getVolName(),
quota.getBucketName());
+ // TODO remove bucket lock
+ metadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
quota.getVolName(), quota.getBucketName());
+ try {
+ OmBucketInfo bucketInfo = bucketTable.get(bucketKey);
+ if (bucketInfo instanceof OmBucketInfoQuotaTracker) {
+ bucketInfo = bucketTable.getSkipCache(bucketKey);
+ }
+ if (null == bucketInfo || bucketInfo.getObjectID() !=
quota.getBucketObjectId()) {
+ continue;
+ }
+ bucketInfo.incrUsedBytes(quota.getDiffUsedBytes());
+ bucketInfo.incrUsedNamespace(quota.getDiffUsedNamespace());
+ bucketTable.putWithBatch(batchOperation, bucketKey, bucketInfo);
+ bucketTable.addCacheEntry(bucketKey, bucketInfo, -1);
+ LOG.debug("Updated bucket quota {}-{} for key {}",
quota.getDiffUsedBytes(), quota.getDiffUsedNamespace(),
+ quota.getBucketName());
+ } finally {
+ metadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
quota.getVolName(), quota.getBucketName());
+ }
+ }
long txIndex = 0;
TransactionInfo transactionInfo =
TransactionInfo.readTransactionInfo(metadataManager);
if (transactionInfo != null && transactionInfo.getIndex() != null) {
@@ -96,17 +132,59 @@ public class OMPersistDbRequest extends OMClientRequest {
batchOperation, TRANSACTION_INFO_KEY,
TransactionInfo.valueOf(termIndex, txIndex));
metadataManager.getStore().commitBatchOperation(batchOperation);
omResponse.setPersistDbResponse(OzoneManagerProtocolProtos.PersistDbResponse.newBuilder().build());
- refreshCache(ozoneManager, tableUpdatesList);
} catch (IOException ex) {
audit(ozoneManager, dbUpdateRequest, termIndex, ex);
LOG.error("Db persist exception", ex);
return new DummyOMClientResponse(createErrorOMResponse(omResponse, ex));
+ } finally {
+ bucketTable.cleanupCache(Collections.singletonList(Long.MAX_VALUE));
+ volumeTable.cleanupCache(Collections.singletonList(Long.MAX_VALUE));
}
audit(ozoneManager, dbUpdateRequest, termIndex, null);
OMClientResponse omClientResponse = new
DummyOMClientResponse(omResponse.build());
return omClientResponse;
}
+ private static void updateBucketRecord(
+ Table<String, OmBucketInfo> bucketTable, BatchOperation batchOperation,
+ List<OzoneManagerProtocolProtos.DBTableRecord> recordsList) throws
IOException {
+ for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
+ String key = record.getKey().toStringUtf8();
+ if (record.hasValue()) {
+ OmBucketInfo updateInfo =
OmBucketInfo.getCodec().fromPersistedFormat(record.getValue().toByteArray());
+ OmBucketInfo bucketInfo = bucketTable.getSkipCache(key);
+ if (null != bucketInfo) {
+ updateInfo.incrUsedBytes(-updateInfo.getUsedBytes() +
bucketInfo.getUsedBytes());
+ updateInfo.incrUsedNamespace(-updateInfo.getUsedNamespace() +
bucketInfo.getUsedNamespace());
+ bucketTable.put(key, updateInfo);
+ } else {
+ bucketTable.getRawTable().putWithBatch(batchOperation,
record.getKey().toByteArray(),
+ record.getValue().toByteArray());
+ bucketTable.addCacheEntry(key, updateInfo, -1);
+ }
+ } else {
+ bucketTable.getRawTable().deleteWithBatch(batchOperation,
record.getKey().toByteArray());
+ bucketTable.addCacheEntry(key, -1);
+ }
+ }
+ }
+ private static void updateVolumeRecord(
+ Table<String, OmVolumeArgs> volumeTable, BatchOperation batchOperation,
+ List<OzoneManagerProtocolProtos.DBTableRecord> recordsList) throws
IOException {
+ for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
+ String key = record.getKey().toStringUtf8();
+ if (record.hasValue()) {
+ OmVolumeArgs updateInfo =
OmVolumeArgs.getCodec().fromPersistedFormat(record.getValue().toByteArray());
+ volumeTable.getRawTable().putWithBatch(batchOperation,
record.getKey().toByteArray(),
+ record.getValue().toByteArray());
+ volumeTable.addCacheEntry(key, updateInfo, -1);
+ } else {
+ volumeTable.getRawTable().deleteWithBatch(batchOperation,
record.getKey().toByteArray());
+ volumeTable.addCacheEntry(key, -1);
+ }
+ }
+ }
+
public void audit(OzoneManager ozoneManager,
OzoneManagerProtocolProtos.PersistDbRequest request,
TermIndex termIndex, Throwable th) {
List<Long> indexList = request.getIndexList();
@@ -121,8 +199,4 @@ public class OMPersistDbRequest extends OMClientRequest {
OMSystemAction.DBPERSIST, auditMap));
}
}
-
- private void refreshCache(OzoneManager om,
List<OzoneManagerProtocolProtos.DBTableUpdate> tblUpdateList) {
- // TODO no-cache, update bucket and volume cache as full table cache in
no-cache
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 88c5ad9140..d679b39dfb 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
+import org.apache.hadoop.ozone.om.ratis.execution.BucketQuotaResource;
+import org.apache.hadoop.ozone.om.ratis.execution.OmBucketInfoQuotaTracker;
import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
@@ -113,6 +115,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
public static final Logger LOG = LoggerFactory.getLogger(OMKeyRequest.class);
private BucketLayout bucketLayout = BucketLayout.DEFAULT;
+ private OmBucketInfo wrappedBucketInfo = null;
public OMKeyRequest(OMRequest omRequest) {
super(omRequest);
@@ -707,8 +710,17 @@ public abstract class OMKeyRequest extends OMClientRequest
{
CacheValue<OmBucketInfo> value = omMetadataManager.getBucketTable()
.getCacheValue(new CacheKey<>(bucketKey));
+ OmBucketInfo cachedBucketInfo = value != null ? value.getCacheValue() :
null;
+ if (null != cachedBucketInfo &&
BucketQuotaResource.instance().isEnabledTrack()) {
+ wrappedBucketInfo = OmBucketInfoQuotaTracker.convert(cachedBucketInfo);
+ return wrappedBucketInfo;
+ }
+ return cachedBucketInfo;
+ }
- return value != null ? value.getCacheValue() : null;
+ @Override
+ public OmBucketInfo getWrappedBucketInfo() {
+ return wrappedBucketInfo;
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 597a40006f..900dee13f6 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -216,16 +216,6 @@ public class S3MultipartUploadCompleteRequest extends
OMKeyRequest {
omBucketInfo.getDefaultReplicationConfig() :
null, ozoneManager);
- OmMultipartKeyInfo multipartKeyInfoFromArgs =
- new OmMultipartKeyInfo.Builder()
- .setUploadID(keyArgs.getMultipartUploadID())
- .setCreationTime(keyArgs.getModificationTime())
- .setReplicationConfig(replicationConfig)
- .setObjectID(pathInfoFSO.getLeafNodeObjectId())
- .setUpdateID(trxnLogIndex)
- .setParentID(pathInfoFSO.getLastKnownParentId())
- .build();
-
OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 0293d1f880..4d885d08cf 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -89,6 +89,8 @@ public class OMPrepareRequest extends OMClientRequest {
Exception exception = null;
try {
+ // to enable at every follower node
+ ozoneManager.getPrepareState().enablePrepareGate();
PrepareResponse omResponse =
PrepareResponse.newBuilder().setTxnID(transactionLogIndex).build();
responseBuilder.setPrepareResponse(omResponse);
response = new DummyOMClientResponse(responseBuilder.build());
@@ -104,9 +106,25 @@ public class OMPrepareRequest extends OMClientRequest {
ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
TransactionInfo.valueOf(termIndex, index));
+ // update ratis index with current one
OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+
omRatisServer.getOmBasicStateMachine().notifyTermIndexUpdated(termIndex.getTerm(),
termIndex.getIndex());
+ Duration flushTimeout =
+
Duration.of(omRequest.getPrepareRequest().getArgs().getTxnApplyWaitTimeoutSeconds(),
ChronoUnit.SECONDS);
+ Duration flushInterval =
+
Duration.of(omRequest.getPrepareRequest().getArgs().getTxnApplyCheckIntervalSeconds(),
ChronoUnit.SECONDS);
+ long endTime = System.currentTimeMillis() + flushTimeout.toMillis();
+ while
(omRatisServer.getOmBasicStateMachine().getLastAppliedTermIndex().getIndex() <
(transactionLogIndex + 1)
+ && System.currentTimeMillis() < endTime) {
+ Thread.sleep(flushInterval.toMillis());
+ }
+ if
(omRatisServer.getOmBasicStateMachine().getLastAppliedTermIndex().getIndex() <
(transactionLogIndex + 1)) {
+ throw new IOException(String.format("After waiting for %d seconds,
Ratis state machine applied index %d " +
+ "which is less than the minimum required index %d.",
flushTimeout.getSeconds(), transactionLogIndex,
+ (transactionLogIndex + 1)));
+ }
final RaftServer.Division division = omRatisServer.getServerDivision();
- takeSnapshotAndPurgeLogs(transactionLogIndex, division);
+ takeSnapshotAndPurgeLogs(transactionLogIndex + 1, division);
// Save prepare index to a marker file, so if the OM restarts,
// it will remain in prepare mode as long as the file exists and its
@@ -119,7 +137,7 @@ public class OMPrepareRequest extends OMClientRequest {
exception = e;
LOG.error("Prepare Request Apply failed in {}. ",
ozoneManager.getOMNodeId(), e);
response = new
DummyOMClientResponse(createErrorOMResponse(responseBuilder, e));
- } catch (IOException e) {
+ } catch (InterruptedException | IOException e) {
// Set error code so that prepare failure does not cause the OM to
terminate.
exception = e;
LOG.error("Prepare Request Apply failed in {}. ",
ozoneManager.getOMNodeId(), e);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 723aaa7bc4..43c855ebda 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -302,7 +302,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements OzoneManagerP
/**
* Submits request directly to OM.
*/
- private OMResponse submitRequestDirectlyToOM(OMRequest request) {
+ private OMResponse submitRequestDirectlyToOM(OMRequest request) throws
ServiceException {
final OMClientResponse omClientResponse;
try {
if (OmUtils.isReadOnly(request)) {
@@ -317,6 +317,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements OzoneManagerP
OMAuditLogger.log(omClientRequest.getAuditBuilder());
throw ex;
}
+ if (ozoneManager.isLeaderExecutorEnabled()) {
+ return submitRequestToRatis(request);
+ }
final TermIndex termIndex =
TransactionInfo.getTermIndex(transactionIndex.incrementAndGet());
omClientResponse = handler.handleWriteRequest(request, termIndex,
ozoneManagerDoubleBuffer);
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 04e8efa7b7..1b022747ac 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -211,6 +211,6 @@ public class TestDirectoryDeletingService {
"base: " + delDirCnt[0] + ", new: " + delDirCnt[1]);
delDirCnt[0] = delDirCnt[1];
return dirDeletingService.getDeletedDirsCount() >= dirCreatesCount;
- }, 500, 300000);
+ }, 100, 300000);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]