This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 56062d39061 HDDS-13764. KeyDeletingService and
DirectoryDeletingService should reduce snapshot bucket quota usage (#9122)
56062d39061 is described below
commit 56062d39061d204d1176a4c3e6158b1eea6e9ede
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Oct 9 14:51:19 2025 -0400
HDDS-13764. KeyDeletingService and DirectoryDeletingService should reduce
snapshot bucket quota usage (#9122)
---
.../apache/hadoop/ozone/util/ProtobufUtils.java | 4 +
.../org/apache/hadoop/ozone/om/TestKeyPurging.java | 2 +-
.../src/main/proto/OmClientProtocol.proto | 7 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 23 ++-
.../hadoop/ozone/om/PendingKeysDeletion.java | 89 +++++++--
.../key/OMDirectoriesPurgeRequestWithFSO.java | 20 ++-
.../ozone/om/request/key/OMKeyPurgeRequest.java | 65 ++++++-
.../ozone/om/response/key/OMKeyPurgeResponse.java | 11 +-
.../ozone/om/service/KeyDeletingService.java | 200 +++++++++++++++------
.../TestOMDirectoriesPurgeRequestAndResponse.java | 27 ++-
.../key/TestOMKeyPurgeRequestAndResponse.java | 4 +-
.../ozone/om/service/TestKeyDeletingService.java | 99 ++++++----
12 files changed, 427 insertions(+), 124 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
index 10c3669a94e..05d2b116d26 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java
@@ -46,4 +46,8 @@ public static UUID fromProtobuf(HddsProtos.UUID proto) {
public static int computeRepeatedStringSize(String value) {
return CodedOutputStream.computeStringSizeNoTag(value);
}
+
+ public static int computeLongSizeWithTag(int fieldNumber, long value) {
+ return CodedOutputStream.computeInt64Size(fieldNumber, value);
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index fa59754b67f..2a09ffc5ddc 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -127,7 +127,7 @@ public void testKeysPurgingByKeyDeletingService() throws
Exception {
() -> {
try {
return keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE)
- .getKeyBlocksList().isEmpty();
+ .getPurgedKeys().isEmpty();
} catch (IOException e) {
return false;
}
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 2329da73c64..61a3c1d6792 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1417,6 +1417,13 @@ message PurgeKeysRequest {
// previous snapshotID can also be null & this field would be absent in
older requests.
optional NullableUUID expectedPreviousSnapshotID = 4;
repeated string renamedKeys = 5;
+ repeated BucketPurgeKeysSize bucketPurgeKeysSize = 6;
+}
+
+message BucketPurgeKeysSize {
+ optional BucketNameInfo bucketNameInfo = 1;
+ optional uint64 purgedBytes = 2;
+ optional uint64 purgedNamespace = 3;
}
message PurgeKeysResponse {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 8c50232355c..02949d5ee74 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -80,7 +80,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.security.GeneralSecurityException;
@@ -138,6 +138,7 @@
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -161,6 +162,7 @@
import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
import org.apache.hadoop.ozone.om.service.CompactionService;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
@@ -741,9 +743,8 @@ public PendingKeysDeletion getPendingDeletionKeys(
String volume, String bucket, String startKey,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter,
int count) throws IOException {
- List<BlockGroup> keyBlocksList = Lists.newArrayList();
+ Map<String, PurgedKey> purgedKeys = Maps.newHashMap();
Map<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
- Map<String, Long> keyBlockReplicatedSize = new HashMap<>();
int notReclaimableKeyCount = 0;
// Bucket prefix would be empty if volume is empty i.e. either null or "".
@@ -762,9 +763,11 @@ public PendingKeysDeletion getPendingDeletionKeys(
KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
if (kv != null) {
RepeatedOmKeyInfo notReclaimableKeyInfo = new
RepeatedOmKeyInfo(kv.getValue().getBucketId());
- List<BlockGroup> blockGroupList = Lists.newArrayList();
+ Map<String, PurgedKey> reclaimableKeys = Maps.newHashMap();
// Multiple keys with the same path can be queued in one DB entry
RepeatedOmKeyInfo infoList = kv.getValue();
+ long bucketId = infoList.getBucketId();
+ int reclaimableKeyCount = 0;
for (OmKeyInfo info : infoList.getOmKeyInfoList()) {
// Skip the key if the filter doesn't allow the file to be deleted.
@@ -772,10 +775,12 @@ public PendingKeysDeletion getPendingDeletionKeys(
List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
.flatMap(versionLocations ->
versionLocations.getLocationList().stream()
.map(b -> new BlockID(b.getContainerID(),
b.getLocalID()))).collect(Collectors.toList());
- BlockGroup keyBlocks =
BlockGroup.newBuilder().setKeyName(kv.getKey())
+ String blockGroupName = kv.getKey() + "/" +
reclaimableKeyCount++;
+ BlockGroup keyBlocks =
BlockGroup.newBuilder().setKeyName(blockGroupName)
.addAllBlockIDs(blockIDS).build();
- keyBlockReplicatedSize.put(keyBlocks.getGroupID(),
info.getReplicatedSize());
- blockGroupList.add(keyBlocks);
+ reclaimableKeys.put(blockGroupName,
+ new PurgedKey(info.getVolumeName(), info.getBucketName(),
bucketId,
+ keyBlocks, kv.getKey(), OMKeyRequest.sumBlockLengths(info),
info.isDeletedKeyCommitted()));
currentCount++;
} else {
notReclaimableKeyInfo.addOmKeyInfo(info);
@@ -789,12 +794,12 @@ public PendingKeysDeletion getPendingDeletionKeys(
notReclaimableKeyInfoList.size() !=
infoList.getOmKeyInfoList().size()) {
keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
}
- keyBlocksList.addAll(blockGroupList);
+ purgedKeys.putAll(reclaimableKeys);
notReclaimableKeyCount += notReclaimableKeyInfoList.size();
}
}
}
- return new PendingKeysDeletion(keyBlocksList, keysToModify,
keyBlockReplicatedSize, notReclaimableKeyCount);
+ return new PendingKeysDeletion(purgedKeys, keysToModify,
notReclaimableKeyCount);
}
private <V, R> List<KeyValue<String, R>> getTableEntries(String startKey,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
index ab8c4dda167..1071705b5eb 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.om;
-import java.util.List;
import java.util.Map;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -35,18 +34,15 @@
*/
public class PendingKeysDeletion {
- private Map<String, RepeatedOmKeyInfo> keysToModify;
- private List<BlockGroup> keyBlocksList;
- private Map<String, Long> keyBlockReplicatedSize;
+ private final Map<String, RepeatedOmKeyInfo> keysToModify;
+ private final Map<String, PurgedKey> purgedKeys;
private int notReclaimableKeyCount;
- public PendingKeysDeletion(List<BlockGroup> keyBlocksList,
- Map<String, RepeatedOmKeyInfo> keysToModify,
- Map<String, Long> keyBlockReplicatedSize,
- int notReclaimableKeyCount) {
+ public PendingKeysDeletion(Map<String, PurgedKey> purgedKeys,
+ Map<String, RepeatedOmKeyInfo> keysToModify,
+ int notReclaimableKeyCount) {
this.keysToModify = keysToModify;
- this.keyBlocksList = keyBlocksList;
- this.keyBlockReplicatedSize = keyBlockReplicatedSize;
+ this.purgedKeys = purgedKeys;
this.notReclaimableKeyCount = notReclaimableKeyCount;
}
@@ -54,12 +50,77 @@ public Map<String, RepeatedOmKeyInfo> getKeysToModify() {
return keysToModify;
}
- public List<BlockGroup> getKeyBlocksList() {
- return keyBlocksList;
+ public Map<String, PurgedKey> getPurgedKeys() {
+ return purgedKeys;
}
- public Map<String, Long> getKeyBlockReplicatedSize() {
- return keyBlockReplicatedSize;
+ /**
+ * Represents metadata for a key that has been purged.
+ *
+ * This class holds information about a specific purged key,
+ * including its volume, bucket, associated block group,
+ * and the amount of data purged in bytes.
+ */
+ public static class PurgedKey {
+ private final String volume;
+ private final String bucket;
+ private final long bucketId;
+ private final BlockGroup blockGroup;
+ private final long purgedBytes;
+ private final boolean isCommittedKey;
+ private final String deleteKeyName;
+
+ public PurgedKey(String volume, String bucket, long bucketId, BlockGroup
group, String deleteKeyName,
+ long purgedBytes, boolean isCommittedKey) {
+ this.volume = volume;
+ this.bucket = bucket;
+ this.bucketId = bucketId;
+ this.blockGroup = group;
+ this.purgedBytes = purgedBytes;
+ this.isCommittedKey = isCommittedKey;
+ this.deleteKeyName = deleteKeyName;
+ }
+
+ public BlockGroup getBlockGroup() {
+ return blockGroup;
+ }
+
+ public long getPurgedBytes() {
+ return purgedBytes;
+ }
+
+ public String getVolume() {
+ return volume;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public long getBucketId() {
+ return bucketId;
+ }
+
+ public boolean isCommittedKey() {
+ return isCommittedKey;
+ }
+
+ public String getDeleteKeyName() {
+ return deleteKeyName;
+ }
+
+ @Override
+ public String toString() {
+ return "PurgedKey{" +
+ "blockGroup=" + blockGroup +
+ ", volume='" + volume + '\'' +
+ ", bucket='" + bucket + '\'' +
+ ", bucketId=" + bucketId +
+ ", purgedBytes=" + purgedBytes +
+ ", isCommittedKey=" + isCommittedKey +
+ ", deleteKeyName='" + deleteKeyName + '\'' +
+ '}';
+ }
}
public int getNotReclaimableKeyCount() {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index 991b16480b4..adbbf58a98f 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
@@ -41,6 +42,7 @@
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.OMSystemAction;
import org.apache.hadoop.ozone.om.DeletingServiceMetrics;
+import org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -55,6 +57,7 @@
import
org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeDirectoriesRequest;
@@ -132,9 +135,12 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
}
try {
int numSubDirMoved = 0, numSubFilesMoved = 0, numDirsDeleted = 0;
+ Map<VolumeBucketId, BucketNameInfo> volumeBucketIdMap =
purgeDirsRequest.getBucketNameInfosList().stream()
+ .collect(Collectors.toMap(bucketNameInfo ->
+ new VolumeBucketId(bucketNameInfo.getVolumeId(),
bucketNameInfo.getBucketId()),
+ Function.identity()));
for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
- for (OzoneManagerProtocolProtos.KeyInfo key :
- path.getMarkDeletedSubDirsList()) {
+ for (OzoneManagerProtocolProtos.KeyInfo key :
path.getMarkDeletedSubDirsList()) {
ProcessedKeyInfo processed = processDeleteKey(key, path,
omMetadataManager);
subDirNames.add(processed.deleteKey);
@@ -154,8 +160,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager
ozoneManager, Execut
}
}
- for (OzoneManagerProtocolProtos.KeyInfo key :
- path.getDeletedSubFilesList()) {
+ for (OzoneManagerProtocolProtos.KeyInfo key :
path.getDeletedSubFilesList()) {
ProcessedKeyInfo processed = processDeleteKey(key, path,
omMetadataManager);
subFileNames.add(processed.deleteKey);
@@ -194,6 +199,13 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
}
if (path.hasDeletedDir()) {
deletedDirNames.add(path.getDeletedDir());
+ BucketNameInfo bucketNameInfo = volumeBucketIdMap.get(new
VolumeBucketId(path.getVolumeId(),
+ path.getBucketId()));
+ OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
+ bucketNameInfo.getVolumeName(), bucketNameInfo.getBucketName());
+ if (omBucketInfo != null && omBucketInfo.getObjectID() ==
path.getBucketId()) {
+ omBucketInfo.purgeSnapshotUsedNamespace(1);
+ }
numDirsDeleted++;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index a4cc9fe2f7a..b543c3cfbf6 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.ozone.om.request.key;
import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.BUCKET_LOCK;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.validatePreviousSnapshotId;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -33,15 +35,19 @@
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.OMSystemAction;
import org.apache.hadoop.ozone.om.DeletingServiceMetrics;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketPurgeKeysSize;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -161,9 +167,64 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
}
return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, e));
}
+ try {
+ List<OmBucketInfo> bucketInfoList =
updateBucketSize(purgeKeysRequest.getBucketPurgeKeysSizeList(),
+ omMetadataManager);
+ return new OMKeyPurgeResponse(omResponse.build(),
+ keysToBePurgedList, renamedKeysToBePurged, fromSnapshotInfo,
keysToUpdateList, bucketInfoList);
+ } catch (OMException oe) {
+
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.KEY_DELETION,
null, oe));
+ return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, oe));
+ }
+ }
- return new OMKeyPurgeResponse(omResponse.build(),
- keysToBePurgedList, renamedKeysToBePurged, fromSnapshotInfo,
keysToUpdateList);
+ private List<OmBucketInfo> updateBucketSize(List<BucketPurgeKeysSize>
bucketPurgeKeysSizeList,
+ OMMetadataManager omMetadataManager) throws OMException {
+ Map<String, Map<String, List<BucketPurgeKeysSize>>> bucketPurgeKeysSizes =
new HashMap<>();
+ List<String[]> bucketKeyList = new ArrayList<>();
+ for (BucketPurgeKeysSize bucketPurgeKey : bucketPurgeKeysSizeList) {
+ String volumeName = bucketPurgeKey.getBucketNameInfo().getVolumeName();
+ String bucketName = bucketPurgeKey.getBucketNameInfo().getBucketName();
+ bucketPurgeKeysSizes.computeIfAbsent(volumeName, k -> new HashMap<>())
+ .computeIfAbsent(bucketName, k -> {
+ bucketKeyList.add(new String[]{volumeName, bucketName});
+ return new ArrayList<>();
+ }).add(bucketPurgeKey);
+ }
+
mergeOmLockDetails(omMetadataManager.getLock().acquireWriteLocks(BUCKET_LOCK,
bucketKeyList));
+ boolean acquiredLock = getOmLockDetails().isLockAcquired();
+ if (!acquiredLock) {
+ throw new OMException("Failed to acquire bucket lock for purging keys.",
+ OMException.ResultCodes.KEY_DELETION_ERROR);
+ }
+ List<OmBucketInfo> bucketInfoList = new ArrayList<>();
+ try {
+ for (Map.Entry<String, Map<String, List<BucketPurgeKeysSize>>> volEntry
: bucketPurgeKeysSizes.entrySet()) {
+ String volumeName = volEntry.getKey();
+ for (Map.Entry<String, List<BucketPurgeKeysSize>> bucketEntry :
volEntry.getValue().entrySet()) {
+ String bucketName = bucketEntry.getKey();
+ OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
+ // Check null if bucket has been deleted.
+ if (omBucketInfo != null) {
+ boolean bucketUpdated = false;
+ for (BucketPurgeKeysSize bucketPurgeKeysSize :
bucketEntry.getValue()) {
+ BucketNameInfo bucketNameInfo =
bucketPurgeKeysSize.getBucketNameInfo();
+ if (bucketNameInfo.getBucketId() == omBucketInfo.getObjectID()) {
+
omBucketInfo.purgeSnapshotUsedBytes(bucketPurgeKeysSize.getPurgedBytes());
+
omBucketInfo.purgeSnapshotUsedNamespace(bucketPurgeKeysSize.getPurgedNamespace());
+ bucketUpdated = true;
+ }
+ }
+ if (bucketUpdated) {
+ bucketInfoList.add(omBucketInfo.copyObject());
+ }
+ }
+ }
+ }
+ return bucketInfoList;
+ } finally {
+
mergeOmLockDetails(omMetadataManager.getLock().releaseWriteLocks(BUCKET_LOCK,
bucketKeyList));
+ }
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index 0ceb5146282..38ce0a6266c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -23,6 +23,7 @@
import jakarta.annotation.Nonnull;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -30,6 +31,7 @@
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
@@ -44,6 +46,7 @@
*/
@CleanupTableInfo(cleanupTables = {DELETED_TABLE, SNAPSHOT_INFO_TABLE})
public class OMKeyPurgeResponse extends OmKeyResponse {
+ private List<OmBucketInfo> bucketInfosToBeUpdated;
private List<String> purgeKeyList;
private List<String> renamedList;
private SnapshotInfo fromSnapshot;
@@ -53,12 +56,14 @@ public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
@Nonnull List<String> keyList,
@Nonnull List<String> renamedList,
SnapshotInfo fromSnapshot,
- List<SnapshotMoveKeyInfos> keysToUpdate) {
+ List<SnapshotMoveKeyInfos> keysToUpdate,
+ List<OmBucketInfo> bucketInfosToBeUpdated) {
super(omResponse);
this.purgeKeyList = keyList;
this.renamedList = renamedList;
this.fromSnapshot = fromSnapshot;
this.keysToUpdateList = keysToUpdate;
+ this.bucketInfosToBeUpdated = bucketInfosToBeUpdated == null ?
Collections.emptyList() : bucketInfosToBeUpdated;
}
/**
@@ -96,6 +101,10 @@ public void addToDBBatch(OMMetadataManager
omMetadataManager,
processKeys(batchOperation, omMetadataManager);
processKeysToUpdate(batchOperation, omMetadataManager);
}
+ for (OmBucketInfo bucketInfo : bucketInfosToBeUpdated) {
+ String bucketKey =
omMetadataManager.getBucketKey(bucketInfo.getVolumeName(),
bucketInfo.getBucketName());
+ omMetadataManager.getBucketTable().putWithBatch(batchOperation,
bucketKey, bucketInfo);
+ }
}
private void processKeysToUpdate(BatchOperation batchOp,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index d7dba697b71..254184f8b82 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -19,6 +19,7 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.util.ProtobufUtils.computeLongSizeWithTag;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -27,6 +28,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -48,7 +50,6 @@
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.ClientVersion;
-import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -57,6 +58,7 @@
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.PendingKeysDeletion;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
@@ -65,6 +67,8 @@
import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableRenameEntryFilter;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketPurgeKeysSize;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.NullableUUID;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
@@ -131,10 +135,9 @@ public AtomicLong getDeletedKeyCount() {
return deletedKeyCount;
}
- Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(List<BlockGroup>
keyBlocksList,
+ Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(Map<String, PurgedKey>
keyBlocksList,
Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntries,
- String snapTableKey, UUID expectedPreviousSnapshotId, Map<String, Long>
keyBlockReplicatedSize)
- throws IOException, InterruptedException {
+ String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException
{
long startTime = Time.monotonicNow();
Pair<Pair<Integer, Long>, Boolean> purgeResult = Pair.of(Pair.of(0, 0L),
false);
if (LOG.isDebugEnabled()) {
@@ -146,16 +149,18 @@ Pair<Pair<Integer, Long>, Boolean>
processKeyDeletes(List<BlockGroup> keyBlocksL
logSize = keyBlocksList.size();
}
LOG.info("Send {} key(s) to SCM, first {} keys: {}",
- keyBlocksList.size(), logSize, keyBlocksList.subList(0, logSize));
+ keyBlocksList.size(), logSize,
keyBlocksList.entrySet().stream().limit(logSize)
+ .map(Map.Entry::getValue).collect(Collectors.toSet()));
}
List<DeleteBlockGroupResult> blockDeletionResults =
- scmClient.deleteKeyBlocks(keyBlocksList);
+ scmClient.deleteKeyBlocks(keyBlocksList.values().stream()
+ .map(PurgedKey::getBlockGroup).collect(Collectors.toList()));
LOG.info("{} BlockGroup deletion are acked by SCM in {} ms",
keyBlocksList.size(), Time.monotonicNow() - startTime);
if (blockDeletionResults != null) {
long purgeStartTime = Time.monotonicNow();
- purgeResult = submitPurgeKeysRequest(blockDeletionResults, keysToModify,
renameEntries, snapTableKey,
- expectedPreviousSnapshotId, ratisByteLimit, keyBlockReplicatedSize);
+ purgeResult = submitPurgeKeysRequest(blockDeletionResults,
keyBlocksList, keysToModify, renameEntries,
+ snapTableKey, expectedPreviousSnapshotId, ratisByteLimit);
int limit =
getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK,
OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms.
Limit per task is {}.",
@@ -165,6 +170,60 @@ Pair<Pair<Integer, Long>, Boolean>
processKeyDeletes(List<BlockGroup> keyBlocksL
return purgeResult;
}
+ private static final class BucketPurgeSize {
+ private BucketNameInfo bucket;
+ private long purgedBytes;
+ private long purgedNamespace;
+
+ private BucketPurgeSize(String volume, String bucket, long bucketId) {
+ this.bucket =
BucketNameInfo.newBuilder().setBucketId(bucketId).setVolumeName(volume)
+ .setBucketName(bucket).build();
+ this.purgedBytes = 0;
+ this.purgedNamespace = 0;
+ }
+
+ private BucketPurgeSize incrementPurgedBytes(long bytes) {
+ purgedBytes += bytes;
+ return this;
+ }
+
+ private BucketPurgeSize incrementPurgedNamespace(long namespace) {
+ purgedNamespace += namespace;
+ return this;
+ }
+
+ private BucketPurgeKeysSize toProtobuf() {
+ return BucketPurgeKeysSize.newBuilder()
+ .setBucketNameInfo(bucket)
+ .setPurgedBytes(purgedBytes)
+ .setPurgedNamespace(purgedNamespace)
+ .build();
+ }
+
+ private int getEstimatedSize() {
+ // Using -10 as the placeholder to get max size i.e. 10 bytes to store
the long value in protobuf.
+ // Field number 2 in BucketPurgeKeysSize proto corresponds to
purgedBytes.
+ return this.bucket.getSerializedSize() + computeLongSizeWithTag(2, -10)
+ // Field number 3 in BucketPurgeKeysSize proto corresponds to
purgedNamespace.
+ + computeLongSizeWithTag(3, -10);
+ }
+ }
+
+ private int increaseBucketPurgeSize(Map<Long, BucketPurgeSize>
bucketPurgeSizeMap, PurgedKey purgedKey) {
+ BucketPurgeSize bucketPurgeSize;
+ int estimatedSize = 0;
+ if (!bucketPurgeSizeMap.containsKey(purgedKey.getBucketId())) {
+ bucketPurgeSize =
bucketPurgeSizeMap.computeIfAbsent(purgedKey.getBucketId(),
+ (bucketId) -> new BucketPurgeSize(purgedKey.getVolume(),
purgedKey.getBucket(),
+ purgedKey.getBucketId()));
+ estimatedSize = bucketPurgeSize.getEstimatedSize();
+ } else {
+ bucketPurgeSize = bucketPurgeSizeMap.get(purgedKey.getBucketId());
+ }
+
bucketPurgeSize.incrementPurgedBytes(purgedKey.getPurgedBytes()).incrementPurgedNamespace(1);
+ return estimatedSize;
+ }
+
/**
* Submits PurgeKeys request for the keys whose blocks have been deleted
* by SCM.
@@ -174,14 +233,14 @@ Pair<Pair<Integer, Long>, Boolean>
processKeyDeletes(List<BlockGroup> keyBlocksL
@SuppressWarnings("checkstyle:MethodLength")
private Pair<Pair<Integer, Long>, Boolean> submitPurgeKeysRequest(
List<DeleteBlockGroupResult> results,
+ Map<String, PurgedKey> purgedKeys,
Map<String, RepeatedOmKeyInfo> keysToModify,
List<String> renameEntriesToBeDeleted,
String snapTableKey,
UUID expectedPreviousSnapshotId,
- int ratisLimit,
- Map<String, Long> keyBlockReplicatedSize) throws InterruptedException {
+ int ratisLimit) {
- List<String> purgeKeys = new ArrayList<>();
+ Set<String> completePurgedKeys = new HashSet<>();
// Put all keys to be purged in a list
int deletedCount = 0;
@@ -191,38 +250,50 @@ private Pair<Pair<Integer, Long>, Boolean>
submitPurgeKeysRequest(
// Step 1: Process DeleteBlockGroupResults
for (DeleteBlockGroupResult result : results) {
- String deletedKey = result.getObjectKey();
- if (result.isSuccess()) {
- // Add key to PurgeKeys list.
- if (keysToModify != null && !keysToModify.containsKey(deletedKey)) {
- // Parse Volume and BucketName
- purgeKeys.add(deletedKey);
- if (LOG.isDebugEnabled()) {
+ String deletedKeyGroup = result.getObjectKey();
+ PurgedKey purgedKey = purgedKeys.get(deletedKeyGroup);
+ if (purgedKey != null) {
+ String deletedKeyName = purgedKey.getDeleteKeyName();
+ if (result.isSuccess()) {
+ // Add key to PurgeKeys list.
+ if (keysToModify == null ||
!keysToModify.containsKey(deletedKeyName)) {
+ completePurgedKeys.add(deletedKeyName);
+ LOG.debug("Key {} set to be purged from OM DB", deletedKeyName);
+ } else {
LOG.debug("Key {} set to be updated in OM DB, Other versions " +
- "of the key that are reclaimable are reclaimed.", deletedKey);
+ "of the key that are reclaimable are reclaimed.",
deletedKeyName);
}
- } else if (keysToModify == null) {
- purgeKeys.add(deletedKey);
+ deletedReplSize += purgedKey.getPurgedBytes();
+ deletedCount++;
+ } else {
+ // If the block deletion failed, then the deleted keys should also
not be modified and
+ // any other version of the key should also not be purged.
+ failedDeletedKeys.add(deletedKeyName);
+ purgeSuccess = false;
if (LOG.isDebugEnabled()) {
- LOG.debug("Key {} set to be purged from OM DB", deletedKey);
+ LOG.error("Failed Block Delete corresponding to Key {} with block
result : {}.", deletedKeyName,
+ result.getBlockResultList());
+ } else {
+ LOG.error("Failed Block Delete corresponding to Key {}.",
deletedKeyName);
}
}
- if (keyBlockReplicatedSize != null) {
- deletedReplSize += keyBlockReplicatedSize.getOrDefault(deletedKey,
0L);
- }
- deletedCount++;
} else {
- // If the block deletion failed, then the deleted keys should also not
be modified.
- failedDeletedKeys.add(deletedKey);
- purgeSuccess = false;
+ LOG.error("Key {} not found in the list of keys to be purged." +
+ " Skipping purge for this entry. Result of delete blocks : {}",
deletedKeyGroup, result.isSuccess());
}
}
+ // Filter out the key even if one version of the key purge has failed.
This is to prevent orphan blocks, and
+ // this needs to be retried.
+ completePurgedKeys = completePurgedKeys.stream()
+ .filter(i ->
!failedDeletedKeys.contains(i)).collect(Collectors.toSet());
+ // Filter out any keys that have failed and sort the purge keys based on
volume and bucket.
+ List<PurgedKey> purgedKeyList = purgedKeys.values().stream()
+ .filter(purgedKey ->
!failedDeletedKeys.contains(purgedKey.getDeleteKeyName()))
+ .collect(Collectors.toList());
- // Step 2: Prepare keysToUpdateList
List<OzoneManagerProtocolProtos.SnapshotMoveKeyInfos> keysToUpdateList =
new ArrayList<>();
if (keysToModify != null) {
- for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
- keysToModify.entrySet()) {
+ for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
keysToModify.entrySet()) {
if (failedDeletedKeys.contains(keyToModify.getKey())) {
continue;
}
@@ -239,7 +310,7 @@ private Pair<Pair<Integer, Long>, Boolean>
submitPurgeKeysRequest(
}
}
- if (purgeKeys.isEmpty() && keysToUpdateList.isEmpty() &&
+ if (purgedKeyList.isEmpty() && keysToUpdateList.isEmpty() &&
(renameEntriesToBeDeleted == null ||
renameEntriesToBeDeleted.isEmpty())) {
return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess);
}
@@ -249,20 +320,33 @@ private Pair<Pair<Integer, Long>, Boolean>
submitPurgeKeysRequest(
int currSize = requestBuilder.build().getSerializedSize();
int baseSize = currSize;
- while (purgeKeyIndex < purgeKeys.size() || updateIndex <
keysToUpdateList.size() ||
+ OzoneManagerProtocolProtos.DeletedKeys.Builder bucketDeleteKeys = null;
+ Map<Long, BucketPurgeSize> bucketPurgeKeysSizeMap = new HashMap<>();
+
+ Map<String, List<PurgedKey>> modifiedKeyPurgedKeys = new HashMap<>();
+ while (purgeKeyIndex < purgedKeyList.size() || updateIndex <
keysToUpdateList.size() ||
(renameEntriesToBeDeleted != null && renameIndex <
renameEntriesToBeDeleted.size())) {
// 3.1 Purge keys (one at a time)
- if (purgeKeyIndex < purgeKeys.size()) {
- String nextKey = purgeKeys.get(purgeKeyIndex);
- int estimatedKeySize =
ProtobufUtils.computeRepeatedStringSize(nextKey);
-
- requestBuilder.addDeletedKeys(
-
OzoneManagerProtocolProtos.DeletedKeys.newBuilder().setVolumeName("").setBucketName("").addKeys(nextKey)
- .build());
- currSize += estimatedKeySize;
+ if (purgeKeyIndex < purgedKeyList.size()) {
+ PurgedKey purgedKey = purgedKeyList.get(purgeKeyIndex);
+ if (bucketDeleteKeys == null) {
+ bucketDeleteKeys =
OzoneManagerProtocolProtos.DeletedKeys.newBuilder().setVolumeName("").setBucketName("");
+ currSize += bucketDeleteKeys.buildPartial().getSerializedSize();
+ }
+ String deletedKey = purgedKey.getDeleteKeyName();
+ // Add to purge keys only if there are no other version of key that
needs to be retained.
+ if (completePurgedKeys.contains(deletedKey)) {
+ bucketDeleteKeys.addKeys(deletedKey);
+ int estimatedKeySize =
ProtobufUtils.computeRepeatedStringSize(deletedKey);
+ currSize += estimatedKeySize;
+ if (purgedKey.isCommittedKey()) {
+ currSize += increaseBucketPurgeSize(bucketPurgeKeysSizeMap,
purgedKey);
+ }
+ } else if (purgedKey.isCommittedKey()) {
+ modifiedKeyPurgedKeys.computeIfAbsent(deletedKey, k -> new
ArrayList<>()).add(purgedKey);
+ }
purgeKeyIndex++;
-
} else if (updateIndex < keysToUpdateList.size()) {
// 3.2 Add keysToUpdate
OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate =
keysToUpdateList.get(updateIndex);
@@ -270,6 +354,13 @@ private Pair<Pair<Integer, Long>, Boolean>
submitPurgeKeysRequest(
int estimatedSize = nextUpdate.getSerializedSize();
requestBuilder.addKeysToUpdate(nextUpdate);
+ if (modifiedKeyPurgedKeys.containsKey(nextUpdate.getKey())) {
+ for (PurgedKey purgedKey :
modifiedKeyPurgedKeys.get(nextUpdate.getKey())) {
+ if (purgedKey.isCommittedKey()) {
+ currSize += increaseBucketPurgeSize(bucketPurgeKeysSizeMap,
purgedKey);
+ }
+ }
+ }
currSize += estimatedSize;
updateIndex++;
@@ -285,10 +376,17 @@ private Pair<Pair<Integer, Long>, Boolean>
submitPurgeKeysRequest(
}
// Flush either when limit is hit, or at the very end if items remain
- boolean allDone = purgeKeyIndex == purgeKeys.size() && updateIndex ==
keysToUpdateList.size() &&
+ boolean allDone = purgeKeyIndex == purgedKeyList.size() && updateIndex
== keysToUpdateList.size() &&
(renameEntriesToBeDeleted == null || renameIndex ==
renameEntriesToBeDeleted.size());
- if (currSize >= ratisLimit || (allDone &&
hasPendingItems(requestBuilder))) {
+ if (currSize >= ratisLimit || (allDone &&
(hasPendingItems(requestBuilder) || bucketDeleteKeys != null))) {
+ if (bucketDeleteKeys != null) {
+ requestBuilder.addDeletedKeys(bucketDeleteKeys.build());
+ bucketDeleteKeys = null;
+ }
+
bucketPurgeKeysSizeMap.values().stream().map(BucketPurgeSize::toProtobuf)
+ .forEach(requestBuilder::addBucketPurgeKeysSize);
+ bucketPurgeKeysSizeMap.clear();
purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess,
requestBuilder);
requestBuilder = getPurgeKeysRequest(snapTableKey,
expectedPreviousSnapshotId);
currSize = baseSize;
@@ -474,23 +572,23 @@ private void processDeletedKeysForStore(SnapshotInfo
currentSnapshotInfo, KeyMan
PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null
? keyManager.getPendingDeletionKeys(reclaimableKeyFilter,
remainNum)
: keyManager.getPendingDeletionKeys(volume, bucket, null,
reclaimableKeyFilter, remainNum);
- List<BlockGroup> keyBlocksList =
pendingKeysDeletion.getKeyBlocksList();
+ Map<String, PurgedKey> purgedKeys =
pendingKeysDeletion.getPurgedKeys();
//submit purge requests if there are renamed entries to be purged or
keys to be purged.
- if (!renamedTableEntries.isEmpty() || keyBlocksList != null &&
!keyBlocksList.isEmpty()) {
+ if (!renamedTableEntries.isEmpty() || purgedKeys != null &&
!purgedKeys.isEmpty()) {
// Validating if the previous snapshot is still the same before
purging the blocks.
SnapshotUtils.validatePreviousSnapshotId(currentSnapshotInfo,
snapshotChainManager,
expectedPreviousSnapshotId);
- Pair<Pair<Integer, Long>, Boolean> purgeResult =
processKeyDeletes(keyBlocksList,
+ Pair<Pair<Integer, Long>, Boolean> purgeResult =
processKeyDeletes(purgedKeys,
pendingKeysDeletion.getKeysToModify(), renamedTableEntries,
snapshotTableKey,
- expectedPreviousSnapshotId,
pendingKeysDeletion.getKeyBlockReplicatedSize());
+ expectedPreviousSnapshotId);
remainNum -= purgeResult.getKey().getKey();
successStatus = purgeResult.getValue();
- getMetrics().incrNumKeysProcessed(keyBlocksList.size());
+ getMetrics().incrNumKeysProcessed(purgedKeys.size());
getMetrics().incrNumKeysSentForPurge(purgeResult.getKey().getKey());
DeletionStats statsToUpdate = currentSnapshotInfo == null ?
aosDeletionStats : snapshotDeletionStats;
statsToUpdate.updateDeletionStats(purgeResult.getKey().getKey(),
purgeResult.getKey().getValue(),
- keyBlocksList.size() +
pendingKeysDeletion.getNotReclaimableKeyCount(),
+ purgedKeys.size() +
pendingKeysDeletion.getNotReclaimableKeyCount(),
pendingKeysDeletion.getNotReclaimableKeyCount()
);
if (successStatus) {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
index 43195d1abf1..ff3fc5f31b9 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
@@ -68,6 +68,7 @@
import
org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
import org.apache.hadoop.util.Time;
@@ -148,13 +149,16 @@ private OMRequest createPurgeKeysRequest(String
fromSnapshot, String purgeDelete
}
private OMRequest createPurgeKeysRequest(String fromSnapshot,
- List<PurgePathRequest> purgePathRequestList) {
+ List<PurgePathRequest> purgePathRequestList, List<BucketNameInfo>
bucketInfoList) {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest
=
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
purgeDirRequest.addAllDeletedPath(purgePathRequestList);
if (fromSnapshot != null) {
purgeDirRequest.setSnapshotTableKey(fromSnapshot);
}
+ if (bucketInfoList != null) {
+ purgeDirRequest.addAllBucketNameInfos(bucketInfoList);
+ }
OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
@@ -180,7 +184,9 @@ private OMRequest createPurgeKeysRequest(String
fromSnapshot, String purgeDelete
PurgePathRequest request = wrapPurgeRequest(
volumeId, bucketId, purgeDeletedDir, subFiles, subDirs);
purgePathRequestList.add(request);
- return createPurgeKeysRequest(fromSnapshot, purgePathRequestList);
+ return createPurgeKeysRequest(fromSnapshot, purgePathRequestList,
Collections.singletonList(
+
BucketNameInfo.newBuilder().setVolumeName(volumeName).setBucketName(bucketName)
+ .setBucketId(bucketId).setVolumeId(volumeId).buildPartial()));
}
private PurgePathRequest wrapPurgeRequest(
@@ -294,8 +300,9 @@ public void testBucketLockWithPurgeDirectory() throws
Exception {
// Add volume, bucket and key entries to OM DB.
OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket2,
omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
- String bucketKey2 = omMetadataManager.getBucketKey(volumeName, bucket1);
+ String bucketKey2 = omMetadataManager.getBucketKey(volumeName, bucket2);
OmBucketInfo bucketInfo2 =
omMetadataManager.getBucketTable().get(bucketKey2);
+ long volumeId = omMetadataManager.getVolumeId(volumeName);
PurgePathRequest purgePathRequest2 =
createBucketDataAndGetPurgePathRequest(bucketInfo2);
IOzoneManagerLock lock = spy(omMetadataManager.getLock());
Set<Long> acquiredLockIds = new ConcurrentSkipListSet<>();
@@ -321,10 +328,19 @@ public void testBucketLockWithPurgeDirectory() throws
Exception {
return lockDetails;
}).when(lock).acquireWriteLocks(eq(BUCKET_LOCK), anyCollection());
when(omMetadataManager.getLock()).thenReturn(lock);
+ List<BucketNameInfo> bucketInfoList = Arrays.asList(
+
BucketNameInfo.newBuilder().setVolumeName(bucketInfo1.getVolumeName())
+ .setBucketName(bucketInfo1.getBucketName())
+
.setBucketId(bucketInfo1.getObjectID()).setVolumeId(volumeId).build(),
+
BucketNameInfo.newBuilder().setVolumeName(bucketInfo2.getVolumeName())
+ .setBucketName(bucketInfo2.getBucketName())
+
.setBucketId(bucketInfo2.getObjectID()).setVolumeId(volumeId).build());
OMDirectoriesPurgeRequestWithFSO purgePathRequests1 = new
OMDirectoriesPurgeRequestWithFSO(
- preExecute(createPurgeKeysRequest(null,
Arrays.asList(purgePathRequest1, purgePathRequest2))));
+ preExecute(createPurgeKeysRequest(null,
Arrays.asList(purgePathRequest1, purgePathRequest2),
+ bucketInfoList)));
OMDirectoriesPurgeRequestWithFSO purgePathRequests2 = new
OMDirectoriesPurgeRequestWithFSO(
- preExecute(createPurgeKeysRequest(null,
Arrays.asList(purgePathRequest2, purgePathRequest1))));
+ preExecute(createPurgeKeysRequest(null,
Arrays.asList(purgePathRequest2, purgePathRequest1),
+ bucketInfoList)));
CompletableFuture future1 = CompletableFuture.runAsync(() ->
purgePathRequests1.validateAndUpdateCache(ozoneManager, 100L));
CompletableFuture future2 = CompletableFuture.runAsync(() ->
@@ -459,6 +475,7 @@ public void testValidateAndUpdateCacheCheckQuota() throws
Exception {
omBucketInfo = omMetadataManager.getBucketTable().get(
bucketKey);
assertEquals(0L * deletedKeyNames.size(), omBucketInfo.getUsedBytes());
+ assertEquals(1000L * deletedKeyNames.size(),
omBucketInfo.getSnapshotUsedBytes());
performBatchOperationCommit(omClientResponse);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index 291c5cd11f2..aa566859cb4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -163,7 +163,7 @@ public void testValidateAndUpdateCache() throws Exception {
OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
omResponse, deleteKeysAndRenamedEntry.getKey(),
deleteKeysAndRenamedEntry.getValue(), null,
- null);
+ null, null);
omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
// Do manual commit and see whether addToBatch is successful or not.
@@ -239,7 +239,7 @@ public void testKeyPurgeInSnapshot() throws Exception {
omMetadataManager.getStore().initBatchOperation()) {
OMKeyPurgeResponse omKeyPurgeResponse = new
OMKeyPurgeResponse(omResponse, deleteKeysAndRenamedEntry.getKey(),
- deleteKeysAndRenamedEntry.getValue(), snapInfo, null);
+ deleteKeysAndRenamedEntry.getValue(), snapInfo, null, null);
omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
// Do manual commit and see whether addToBatch is successful or not.
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index c77cf9b0177..9246df12c9c 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -50,6 +50,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -85,6 +86,7 @@
import org.apache.hadoop.ozone.om.OmTestManagers;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.PendingKeysDeletion;
+import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey;
import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -96,6 +98,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
@@ -140,8 +143,6 @@ class TestKeyDeletingService extends OzoneTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestKeyDeletingService.class);
private static final AtomicInteger OBJECT_COUNTER = new AtomicInteger();
- private static final long DATA_SIZE = 1000L;
-
private OzoneConfiguration conf;
private OzoneManagerProtocol writeClient;
private OzoneManager om;
@@ -240,7 +241,7 @@ void checkIfDeleteServiceIsDeletingKeys()
assertThat(getRunCount()).isGreaterThan(initialRunCount);
assertThat(keyManager.getPendingDeletionKeys(new
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
null,
- keyManager, om.getMetadataManager().getLock()),
Integer.MAX_VALUE).getKeyBlocksList())
+ keyManager, om.getMetadataManager().getLock()),
Integer.MAX_VALUE).getPurgedKeys())
.isEmpty();
}
@@ -269,7 +270,7 @@ void checkDeletionForKeysWithMultipleVersions() throws
Exception {
1000, 10000);
assertThat(getRunCount())
.isGreaterThan(initialRunCount);
- assertThat(keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE).getKeyBlocksList())
+ assertThat(keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE).getPurgedKeys())
.isEmpty();
// The 1st version of the key has 1 block and the 2nd version has 2
@@ -300,21 +301,34 @@ void checkDeletedTableCleanUpForSnapshot() throws
Exception {
// Create snapshot
String snapName = uniqueObjectName("snap");
writeClient.createSnapshot(volumeName, bucketName1, snapName);
-
+ keyDeletingService.suspend();
// Delete the key
writeClient.deleteKey(key1);
writeClient.deleteKey(key2);
-
+ // Create a key3 in bucket1 which should be reclaimable to check quota
usage.
+ OmKeyArgs key3 = createAndCommitKey(volumeName, bucketName1,
uniqueObjectName(keyName), 3);
+ OmBucketInfo bucketInfo = writeClient.getBucketInfo(volumeName,
bucketName1);
+ long key1Size = QuotaUtil.getReplicatedSize(key1.getDataSize(),
key1.getReplicationConfig());
+ long key3Size = QuotaUtil.getReplicatedSize(key3.getDataSize(),
key3.getReplicationConfig());
+
+ assertEquals(key1Size, bucketInfo.getSnapshotUsedBytes());
+ assertEquals(1, bucketInfo.getSnapshotUsedNamespace());
+ writeClient.deleteKey(key3);
+ bucketInfo = writeClient.getBucketInfo(volumeName, bucketName1);
+ assertEquals(key1Size + key3Size, bucketInfo.getSnapshotUsedBytes());
+ assertEquals(2, bucketInfo.getSnapshotUsedNamespace());
+ writeClient.getBucketInfo(volumeName, bucketName1);
+ keyDeletingService.resume();
// Run KeyDeletingService
GenericTestUtils.waitFor(
- () -> getDeletedKeyCount() >= initialDeletedCount + 1,
- 1000, 10000);
+ () -> getDeletedKeyCount() >= initialDeletedCount + 2,
+ 1000, 100000);
assertThat(getRunCount())
.isGreaterThan(initialRunCount);
assertThat(keyManager.getPendingDeletionKeys(new
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
null,
keyManager, om.getMetadataManager().getLock()),
- Integer.MAX_VALUE).getKeyBlocksList())
+ Integer.MAX_VALUE).getPurgedKeys())
.isEmpty();
// deletedTable should have deleted key of the snapshot bucket
@@ -323,6 +337,9 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception
{
metadataManager.getOzoneKey(volumeName, bucketName1, keyName);
String ozoneKey2 =
metadataManager.getOzoneKey(volumeName, bucketName2, keyName);
+ String ozoneKey3 =
+ metadataManager.getOzoneKey(volumeName, bucketName2,
key3.getKeyName());
+
// key1 belongs to snapshot, so it should not be deleted when
// KeyDeletingService runs. But key2 can be reclaimed as it doesn't
@@ -335,6 +352,13 @@ void checkDeletedTableCleanUpForSnapshot() throws
Exception {
= metadataManager.getDeletedTable().getRangeKVs(
null, 100, ozoneKey2);
assertEquals(0, rangeKVs.size());
+ rangeKVs
+ = metadataManager.getDeletedTable().getRangeKVs(
+ null, 100, ozoneKey3);
+ assertEquals(0, rangeKVs.size());
+ bucketInfo = writeClient.getBucketInfo(volumeName, bucketName1);
+ assertEquals(key1Size, bucketInfo.getSnapshotUsedBytes());
+ assertEquals(1, bucketInfo.getSnapshotUsedNamespace());
}
/*
@@ -418,8 +442,8 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1,
metadataManager);
doAnswer(i -> {
PendingKeysDeletion pendingKeysDeletion = (PendingKeysDeletion)
i.callRealMethod();
- for (BlockGroup group : pendingKeysDeletion.getKeyBlocksList()) {
- Assertions.assertNotEquals(deletePathKey[0], group.getGroupID());
+ for (PurgedKey purgedKey :
pendingKeysDeletion.getPurgedKeys().values()) {
+ Assertions.assertNotEquals(deletePathKey[0],
purgedKey.getBlockGroup().getGroupID());
}
return pendingKeysDeletion;
}).when(km).getPendingDeletionKeys(any(), anyInt());
@@ -663,13 +687,13 @@ void testSnapshotExclusiveSize() throws Exception {
final String testVolumeName = getTestName();
final String testBucketName = uniqueObjectName("bucket");
final String keyName = uniqueObjectName("key");
-
+ Map<Integer, Long> keySizeMap = new HashMap<>();
// Create Volume and Buckets
createVolumeAndBucket(testVolumeName, testBucketName, false);
// Create 3 keys
for (int i = 1; i <= 3; i++) {
- createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
+ keySizeMap.put(i, createAndCommitKey(testVolumeName, testBucketName,
keyName + i, 3).getDataSize());
}
assertTableRowCount(keyTable, initialKeyCount + 3, metadataManager);
@@ -681,7 +705,7 @@ void testSnapshotExclusiveSize() throws Exception {
// Create 2 keys
for (int i = 4; i <= 5; i++) {
- createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
+ keySizeMap.put(i, createAndCommitKey(testVolumeName, testBucketName,
keyName + i, 3).getDataSize());
}
// Delete a key, rename 2 keys. We will be using this to test
// how we handle renamed key for exclusive size calculation.
@@ -699,7 +723,7 @@ void testSnapshotExclusiveSize() throws Exception {
// Create 2 keys
for (int i = 6; i <= 7; i++) {
- createAndCommitKey(testVolumeName, testBucketName, keyName + i, 3);
+ keySizeMap.put(i, createAndCommitKey(testVolumeName, testBucketName,
keyName + i, 3).getDataSize());
}
deleteKey(testVolumeName, testBucketName, "renamedKey1");
@@ -734,14 +758,12 @@ void testSnapshotExclusiveSize() throws Exception {
keyDeletingService.resume();
Map<String, Long> expectedSize = new ImmutableMap.Builder<String, Long>()
- .put(snap1, 1000L)
- .put(snap2, 1000L)
- .put(snap3, 2000L)
+ .put(snap1, keySizeMap.get(3))
+ .put(snap2, keySizeMap.get(4))
+ .put(snap3, keySizeMap.get(6) + keySizeMap.get(7))
.put(snap4, 0L)
.build();
- System.out.println(expectedSize);
-
- // Let KeyDeletingService to run for some iterations
+ // Let KeyDeletingService run for some iterations
GenericTestUtils.waitFor(
() -> (getRunCount() > prevKdsRunCount + 20),
100, 100000);
@@ -756,7 +778,6 @@ void testSnapshotExclusiveSize() throws Exception {
Long expected = expectedSize.getOrDefault(snapshotName,
snapshotInfo.getExclusiveSize());
assertNotNull(expected);
- System.out.println(snapshotName);
assertEquals(expected, snapshotInfo.getExclusiveSize());
// Since for the test we are using RATIS/THREE
assertEquals(expected * 3,
snapshotInfo.getExclusiveReplicatedSize());
@@ -805,8 +826,10 @@ public void testFailingModifiedKeyPurge() throws
IOException, InterruptedExcepti
return
OzoneManagerProtocolProtos.OMResponse.newBuilder().setCmdType(purgeRequest.get().getCmdType())
.setStatus(OzoneManagerProtocolProtos.Status.TIMEOUT).build();
});
- List<BlockGroup> blockGroups =
Collections.singletonList(BlockGroup.newBuilder().setKeyName("key1")
- .addAllBlockIDs(Collections.singletonList(new BlockID(1,
1))).build());
+ BlockGroup blockGroup = BlockGroup.newBuilder().setKeyName("key1/1")
+ .addAllBlockIDs(Collections.singletonList(new BlockID(1,
1))).build();
+ Map<String, PurgedKey> blockGroups =
Collections.singletonMap(blockGroup.getGroupID(), new PurgedKey("vol",
+ "buck", 1, blockGroup, "key1", 30, true));
List<String> renameEntriesToBeDeleted =
Collections.singletonList("key2");
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
.setBucketName("buck")
@@ -820,7 +843,7 @@ public void testFailingModifiedKeyPurge() throws
IOException, InterruptedExcepti
.build();
Map<String, RepeatedOmKeyInfo> keysToModify =
Collections.singletonMap("key1",
new RepeatedOmKeyInfo(Collections.singletonList(omKeyInfo), 0L));
- keyDeletingService.processKeyDeletes(blockGroups, keysToModify,
renameEntriesToBeDeleted, null, null, null);
+ keyDeletingService.processKeyDeletes(blockGroups, keysToModify,
renameEntriesToBeDeleted, null, null);
assertTrue(purgeRequest.get().getPurgeKeysRequest().getKeysToUpdateList().isEmpty());
assertEquals(renameEntriesToBeDeleted,
purgeRequest.get().getPurgeKeysRequest().getRenamedKeysList());
}
@@ -967,9 +990,11 @@ void testLastRunAnd24hMetrics() throws Exception {
writeClient.createSnapshot(volumeName, bucketName, snap2);
// Create and delete 5 more keys.
+ long dataSize = 0L;
for (int i = 16; i <= 20; i++) {
OmKeyArgs args = createAndCommitKey(volumeName, bucketName,
uniqueObjectName("key"), 1);
createdKeys.add(args);
+ dataSize = args.getDataSize();
}
for (int i = 15; i < 20; i++) {
writeClient.deleteKey(createdKeys.get(i));
@@ -997,17 +1022,17 @@ void testLastRunAnd24hMetrics() throws Exception {
GenericTestUtils.waitFor(() -> getDeletedKeyCount() == 10, 100, 10000);
// Verify last run AOS deletion metrics.
assertEquals(5, metrics.getAosKeysReclaimedLast());
- assertEquals(5 * DATA_SIZE * 3, metrics.getAosReclaimedSizeLast());
+ assertEquals(5 * dataSize * 3, metrics.getAosReclaimedSizeLast());
assertEquals(5, metrics.getAosKeysIteratedLast());
assertEquals(0, metrics.getAosKeysNotReclaimableLast());
// Verify last run Snapshot deletion metrics.
assertEquals(5, metrics.getSnapKeysReclaimedLast());
- assertEquals(5 * DATA_SIZE * 3, metrics.getSnapReclaimedSizeLast());
+ assertEquals(5 * dataSize * 3, metrics.getSnapReclaimedSizeLast());
assertEquals(15, metrics.getSnapKeysIteratedLast());
assertEquals(10, metrics.getSnapKeysNotReclaimableLast());
// Verify 24h deletion metrics.
assertEquals(10, metrics.getKeysReclaimedInInterval());
- assertEquals(10 * DATA_SIZE * 3, metrics.getReclaimedSizeInInterval());
+ assertEquals(10 * dataSize * 3, metrics.getReclaimedSizeInInterval());
// Delete snap1. Which also sets the snap2 to be deep cleaned.
writeClient.deleteSnapshot(volumeName, bucketName, snap1);
@@ -1035,12 +1060,12 @@ void testLastRunAnd24hMetrics() throws Exception {
assertEquals(0, metrics.getAosKeysNotReclaimableLast());
// Verify last run Snapshot deletion metrics.
assertEquals(10, metrics.getSnapKeysReclaimedLast());
- assertEquals(10 * DATA_SIZE * 3, metrics.getSnapReclaimedSizeLast());
+ assertEquals(10 * dataSize * 3, metrics.getSnapReclaimedSizeLast());
assertEquals(10, metrics.getSnapKeysIteratedLast());
assertEquals(0, metrics.getSnapKeysNotReclaimableLast());
// Verify 24h deletion metrics.
assertEquals(20, metrics.getKeysReclaimedInInterval());
- assertEquals(20 * DATA_SIZE * 3, metrics.getReclaimedSizeInInterval());
+ assertEquals(20 * dataSize * 3, metrics.getReclaimedSizeInInterval());
}
}
@@ -1245,6 +1270,7 @@ private void createVolumeAndBucket(String volumeName,
OMRequestTestUtils.addBucketToOM(keyManager.getMetadataManager(),
OmBucketInfo.newBuilder().setVolumeName(volumeName)
.setBucketName(bucketName)
+ .setObjectID(OBJECT_COUNTER.incrementAndGet())
.setIsVersionEnabled(isVersioningEnabled)
.build());
}
@@ -1314,10 +1340,11 @@ private OmKeyArgs createAndCommitKey(String volumeName,
List<OmKeyLocationInfo> latestBlocks = keyLocationVersions
.getBlocksLatestVersionOnly();
-
+ long size = 0;
int preAllocatedSize = latestBlocks.size();
for (OmKeyLocationInfo block : latestBlocks) {
keyArg.addLocationInfo(block);
+ size += block.getLength();
}
LinkedList<OmKeyLocationInfo> allocated = new LinkedList<>();
@@ -1331,8 +1358,9 @@ private OmKeyArgs createAndCommitKey(String volumeName,
for (OmKeyLocationInfo block : allocated) {
keyArg.addLocationInfo(block);
+ size += block.getLength();
}
-
+ keyArg.setDataSize(size);
customWriteClient.commitKey(keyArg, session.getId());
return keyArg;
}
@@ -1352,7 +1380,7 @@ private long getRunCount() {
private int countKeysPendingDeletion() {
try {
final int count = keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE)
- .getKeyBlocksList().size();
+ .getPurgedKeys().size();
LOG.debug("KeyManager keys pending deletion: {}", count);
return count;
} catch (IOException e) {
@@ -1363,8 +1391,9 @@ private int countKeysPendingDeletion() {
private long countBlocksPendingDeletion() {
try {
return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE)
- .getKeyBlocksList()
+ .getPurgedKeys().values()
.stream()
+ .map(PurgedKey::getBlockGroup)
.map(BlockGroup::getBlockIDList)
.mapToLong(Collection::size)
.sum();
@@ -1374,6 +1403,6 @@ private long countBlocksPendingDeletion() {
}
private static String uniqueObjectName(String prefix) {
- return prefix + OBJECT_COUNTER.getAndIncrement();
+ return prefix + String.format("%010d", OBJECT_COUNTER.getAndIncrement());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]