This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-13323-sts
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-13323-sts by this push:
new c63f0096315 HDDS-14094. [STS] Background service to remove revoked
tokens that are past expiration from DB (#9468)
c63f0096315 is described below
commit c63f0096315903435666101c4d416d6d94f4286e
Author: fmorg-git <[email protected]>
AuthorDate: Thu Jan 8 22:43:24 2026 -0800
HDDS-14094. [STS] Background service to remove revoked tokens that are past
expiration from DB (#9468)
---
.../common/src/main/resources/ozone-default.xml | 21 +
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 3 +-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 10 +
.../src/main/proto/OmClientProtocol.proto | 24 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 17 +
.../om/ratis/utils/OzoneManagerRatisUtils.java | 3 +
.../security/S3DeleteRevokedSTSTokensRequest.java | 72 ++++
.../security/S3DeleteRevokedSTSTokensResponse.java | 66 +++
.../om/service/RevokedSTSTokenCleanupService.java | 267 ++++++++++++
.../service/TestRevokedSTSTokenCleanupService.java | 448 +++++++++++++++++++++
.../hadoop/ozone/om/service/package-info.java | 21 +
11 files changed, 946 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 1e8df1c6747..f3ea84abb4a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4858,4 +4858,25 @@
<value>5m</value>
<description>Interval for cleaning up orphan snapshot local data versions
corresponding to snapshots</description>
</property>
+
+ <property>
+ <name>ozone.om.sts.token.cleanup.service.interval</name>
+ <value>3h</value>
+ <tag>OZONE, OM, PERFORMANCE, SECURITY</tag>
+ <description>
+ A background job that periodically checks revoked STS token entries and
+ deletes ones that have existed for 12 hours. This entry controls the
interval of this
+ cleanup check. Unit could be defined with postfix (ns,ms,s,m,h,d).
+ </description>
+ </property>
+ <property>
+ <name>ozone.om.sts.token.cleanup.service.timeout</name>
+ <value>15m</value>
+ <tag>OZONE, OM, PERFORMANCE, SECURITY</tag>
+ <description>
+ A timeout value for the revoked STS token cleanup service. If this is set
+ greater than 0, the service will stop waiting for the deletion
+ completion after this time. Unit could be defined with postfix
(ns,ms,s,m,h,d).
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index dd70a9056f9..9707f7b0cdb 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -344,6 +344,7 @@ public static boolean isReadOnly(
case QuotaRepair:
case PutObjectTagging:
case DeleteObjectTagging:
+ case DeleteRevokedSTSTokens:
case UnknownCommand:
return false;
case EchoRPC:
@@ -364,7 +365,7 @@ public static byte[] getSHADigest() throws IOException {
"This could possibly indicate a faulty JRE");
}
}
-
+
/**
* Get a collection of all active omNodeIds (excluding decommissioned nodes)
* for the given omServiceId.
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 469900aa8ea..a3d9a2063cf 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -682,6 +682,16 @@ public final class OMConfigKeys {
"ozone.om.snapshot.local.data.manager.service.interval";
public static final String
OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT = "5m";
+ public static final String OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_INTERVAL
+ = "ozone.om.sts.token.cleanup.service.interval";
+ public static final String
OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_INTERVAL_DEFAULT
+ = "3h";
+
+ public static final String OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT
+ = "ozone.om.sts.token.cleanup.service.timeout";
+ public static final String OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT_DEFAULT
+ = "15m";
+
/**
* Never constructed.
*/
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 08eb79fbd9e..00892f79a71 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -158,6 +158,7 @@ enum Type {
DeleteObjectTagging = 142;
AssumeRole = 143;
RevokeSTSToken = 144;
+ DeleteRevokedSTSTokens = 145;
}
enum SafeMode {
@@ -308,6 +309,7 @@ message OMRequest {
repeated SetSnapshotPropertyRequest SetSnapshotPropertyRequests =
143;
optional AssumeRoleRequest assumeRoleRequest =
144;
optional RevokeSTSTokenRequest revokeSTSTokenRequest =
145;
+ optional DeleteRevokedSTSTokensRequest deleteRevokedSTSTokensRequest =
146;
}
message OMResponse {
@@ -438,11 +440,12 @@ message OMResponse {
optional GetQuotaRepairStatusResponse GetQuotaRepairStatusResponse =
136;
optional StartQuotaRepairResponse StartQuotaRepairResponse =
137;
- optional GetObjectTaggingResponse getObjectTaggingResponse =
140;
- optional PutObjectTaggingResponse putObjectTaggingResponse =
141;
- optional DeleteObjectTaggingResponse deleteObjectTaggingResponse =
142;
- optional AssumeRoleResponse assumeRoleResponse =
143;
- optional RevokeSTSTokenResponse revokeSTSTokenResponse =
144;
+ optional GetObjectTaggingResponse getObjectTaggingResponse =
140;
+ optional PutObjectTaggingResponse putObjectTaggingResponse =
141;
+ optional DeleteObjectTaggingResponse deleteObjectTaggingResponse =
142;
+ optional AssumeRoleResponse assumeRoleResponse =
143;
+ optional RevokeSTSTokenResponse revokeSTSTokenResponse =
144;
+ optional DeleteRevokedSTSTokensResponse deleteRevokedSTSTokensResponse =
145;
}
enum Status {
@@ -2393,6 +2396,17 @@ message RevokeSTSTokenRequest {
message RevokeSTSTokenResponse {
}
+/**
+ This will contain a list of revoked STS session tokens whose entries should
be removed from
+ the s3RevokedStsTokenTable.
+*/
+message DeleteRevokedSTSTokensRequest {
+ repeated string sessionToken = 1;
+}
+
+message DeleteRevokedSTSTokensResponse {
+}
+
/**
The OM service that takes care of Ozone namespace.
*/
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e6c5f916cc1..7917130950e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -287,6 +287,7 @@
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
+import org.apache.hadoop.ozone.om.service.RevokedSTSTokenCleanupService;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
@@ -438,6 +439,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private final boolean isSpnegoEnabled;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
+ private RevokedSTSTokenCleanupService revokedSTSTokenCleanupService;
private final boolean isOmGrpcServerEnabled;
private volatile boolean isOmRpcServerRunning = false;
private volatile boolean isOmGrpcServerRunning = false;
@@ -1946,6 +1948,18 @@ public void start() throws IOException {
keyManager.start(configuration);
+ final long stsTokenCleanupInterval = configuration.getTimeDuration(
+ OMConfigKeys.OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_INTERVAL,
+ OMConfigKeys.OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ final long stsTokenCleanupTimeout = configuration.getTimeDuration(
+ OMConfigKeys.OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT,
+ OMConfigKeys.OZONE_OM_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ revokedSTSTokenCleanupService = new RevokedSTSTokenCleanupService(
+ stsTokenCleanupInterval, TimeUnit.MILLISECONDS,
stsTokenCleanupTimeout, this);
+ revokedSTSTokenCleanupService.start();
+
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -2524,6 +2538,9 @@ public boolean stop() {
if (edekCacheLoader != null) {
edekCacheLoader.shutdown();
}
+ if (revokedSTSTokenCleanupService != null) {
+ revokedSTSTokenCleanupService.shutdown();
+ }
return true;
} catch (Exception e) {
LOG.error("OzoneManager stop failed.", e);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 706e00f9537..4f1b2fc952d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
import
org.apache.hadoop.ozone.om.request.s3.multipart.S3ExpiredMultipartUploadsAbortRequest;
import org.apache.hadoop.ozone.om.request.s3.security.OMSetSecretRequest;
+import
org.apache.hadoop.ozone.om.request.s3.security.S3DeleteRevokedSTSTokensRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSTSTokenRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSecretRequest;
@@ -199,6 +200,8 @@ public static OMClientRequest createClientRequest(OMRequest
omRequest,
return new S3RevokeSecretRequest(omRequest);
case RevokeSTSToken:
return new S3RevokeSTSTokenRequest(omRequest);
+ case DeleteRevokedSTSTokens:
+ return new S3DeleteRevokedSTSTokensRequest(omRequest);
case PurgeKeys:
return new OMKeyPurgeRequest(omRequest);
case PurgeDirectories:
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3DeleteRevokedSTSTokensRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3DeleteRevokedSTSTokensRequest.java
new file mode 100644
index 00000000000..ee2a8656445
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3DeleteRevokedSTSTokensRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.security;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import
org.apache.hadoop.ozone.om.response.s3.security.S3DeleteRevokedSTSTokensResponse;
+import org.apache.hadoop.ozone.om.service.RevokedSTSTokenCleanupService;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteRevokedSTSTokensRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.security.UserGroupInformation;
+import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+
+/**
+ * Handles DeleteRevokedSTSTokens requests submitted by {@link
RevokedSTSTokenCleanupService}.
+ */
+public class S3DeleteRevokedSTSTokensRequest extends OMClientRequest {
+
+ public S3DeleteRevokedSTSTokensRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+ final UserGroupInformation ugi;
+ try {
+ ugi = createUGI();
+ } catch (AuthenticationException e) {
+ throw new OMException(e, OMException.ResultCodes.PERMISSION_DENIED);
+ }
+ if (!ozoneManager.isAdmin(ugi) && !ozoneManager.isS3Admin(ugi)) {
+ throw new OMException("Only admins can delete revoked STS tokens",
OMException.ResultCodes.PERMISSION_DENIED);
+ }
+
+ return getOmRequest().toBuilder()
+ .setUserInfo(getUserInfo())
+ .build();
+ }
+
+ @Override
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
ExecutionContext context) {
+ final DeleteRevokedSTSTokensRequest request =
getOmRequest().getDeleteRevokedSTSTokensRequest();
+ final OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
+
+ final List<String> sessionTokens = request.getSessionTokenList();
+ return new S3DeleteRevokedSTSTokensResponse(sessionTokens,
omResponse.build());
+ }
+}
+
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3DeleteRevokedSTSTokensResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3DeleteRevokedSTSTokensResponse.java
new file mode 100644
index 00000000000..cb44e7f466d
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3DeleteRevokedSTSTokensResponse.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.security;
+
+import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.S3_REVOKED_STS_TOKEN_TABLE;
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+/**
+ * Response for DeleteRevokedSTSTokens request.
+ */
+@CleanupTableInfo(cleanupTables = {S3_REVOKED_STS_TOKEN_TABLE})
+public class S3DeleteRevokedSTSTokensResponse extends OMClientResponse {
+
+ private final List<String> sessionTokens;
+
+ public S3DeleteRevokedSTSTokensResponse(List<String> sessionTokens, @Nonnull
OMResponse omResponse) {
+ super(omResponse);
+ this.sessionTokens = sessionTokens;
+ }
+
+ @Override
+ public void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation
batchOperation) throws IOException {
+ if (sessionTokens == null || sessionTokens.isEmpty()) {
+ return;
+ }
+ if (!getOMResponse().hasStatus() || getOMResponse().getStatus() != OK) {
+ return;
+ }
+
+ final Table<String, Long> table =
omMetadataManager.getS3RevokedStsTokenTable();
+ if (table == null) {
+ return;
+ }
+
+ for (String sessionToken : sessionTokens) {
+ table.deleteWithBatch(batchOperation, sessionToken);
+ }
+ }
+}
+
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RevokedSTSTokenCleanupService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RevokedSTSTokenCleanupService.java
new file mode 100644
index 00000000000..3d9668d6469
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RevokedSTSTokenCleanupService.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteRevokedSTSTokensRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service that periodically scans the revoked STS token table and
submits OM requests to
+ * remove entries have been present past the cleanup threshold.
+ */
+public class RevokedSTSTokenCleanupService extends BackgroundService {
+ private static final Logger LOG =
LoggerFactory.getLogger(RevokedSTSTokenCleanupService.class);
+
+ // Use a single thread
+ private static final int REVOKED_STS_TOKEN_CLEANER_CORE_POOL_SIZE = 1;
+ private static final Clock CLOCK = Clock.system(ZoneOffset.UTC);
+ private static final long CLEANUP_THRESHOLD = 12 * 60 * 60 * 1000L; // 12
hours in milliseconds
+
+ private final OzoneManager ozoneManager;
+ private final OMMetadataManager metadataManager;
+ private final AtomicBoolean suspended;
+ private final AtomicLong runCount;
+ private final AtomicLong submittedDeletedEntryCount;
+ private final AtomicLong callIdCount;
+ // Dummy client ID to use for response, since this is triggered by a
+ // service, not the client.
+ private final ClientId clientId = ClientId.randomId();
+ private final int ratisByteLimit;
+
+ /**
+ * Creates a Revoked STS Token cleanup service.
+ *
+ * @param interval the interval between successive runs
+ * @param unit the time unit for {@code interval}
+ * @param serviceTimeout timeout for a single run
+ * @param ozoneManager the OzoneManager instance
+ */
+ public RevokedSTSTokenCleanupService(long interval, TimeUnit unit, long
serviceTimeout, OzoneManager ozoneManager) {
+ super(
+ "RevokedSTSTokenCleanupService", interval, unit,
REVOKED_STS_TOKEN_CLEANER_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.metadataManager = ozoneManager.getMetadataManager();
+ this.suspended = new AtomicBoolean(false);
+ this.runCount = new AtomicLong(0);
+ this.submittedDeletedEntryCount = new AtomicLong(0);
+ this.callIdCount = new AtomicLong(0);
+ int limit = (int) ozoneManager.getConfiguration().getStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
+ // Always go to 90% of max limit for request as other header(s) will be
added
+ this.ratisByteLimit = (int) (limit * 0.9);
+ }
+
+ /**
+ * Returns the number of times this Background service has run.
+ * @return Long, run count.
+ */
+ @VisibleForTesting
+ public long getRunCount() {
+ return runCount.get();
+ }
+
+ /**
+ * Returns the number of entries this Background service has submitted for
deletion.
+ * @return Long, submitted for deletion entry count.
+ */
+ @VisibleForTesting
+ public long getSubmittedDeletedEntryCount() {
+ return submittedDeletedEntryCount.get();
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ final BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new RevokedSTSTokenCleanupTask());
+ return queue;
+ }
+
+ private boolean shouldRun() {
+ return !suspended.get() && ozoneManager.isLeaderReady();
+ }
+
+ private class RevokedSTSTokenCleanupTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ if (!shouldRun()) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ final long startTime = Time.monotonicNow();
+ runCount.incrementAndGet();
+ final Table<String, Long> revokedStsTokenTable =
metadataManager.getS3RevokedStsTokenTable();
+
+ long deletedInRun = 0;
+ final List<String> batch = new ArrayList<>();
+
+ try (Table.KeyValueIterator<String, Long> iterator =
revokedStsTokenTable.iterator()) {
+ iterator.seekToFirst();
+ while (iterator.hasNext()) {
+ final Table.KeyValue<String, Long> entry = iterator.next();
+ final String sessionToken = entry.getKey();
+ final Long initialCreationTimeMillis = entry.getValue();
+
+ if (shouldCleanup(initialCreationTimeMillis)) {
+ // Calculate the size this token would add to the protobuf message.
+ // Make a copy of the batch to do the size check
+ final List<String> batchCopyWithCandidate = new ArrayList<>(batch);
+ batchCopyWithCandidate.add(sessionToken);
+ int batchWithCandidateSize =
getBatchSerializedSize(batchCopyWithCandidate);
+
+ // If adding this token would exceed the limit, submit the current
batch
+ if (batchWithCandidateSize > ratisByteLimit) {
+ if (!batch.isEmpty()) {
+ if (submitCleanupRequest(batch)) {
+ deletedInRun += batch.size();
+ } else {
+ LOG.warn("Failed to submit batch of {} revoked tokens.",
batch.size());
+ }
+ batch.clear();
+
+ // Re-calculate the size of the candidate token alone in an
empty batch
+ // to check if it exceeds the limit by itself.
+ final List<String> singleCandidateBatch = new ArrayList<>();
+ singleCandidateBatch.add(sessionToken);
+ batchWithCandidateSize =
getBatchSerializedSize(singleCandidateBatch);
+ }
+
+ // Check if the single token exceeds the limit (either strictly
single or after flush)
+ if (batchWithCandidateSize > ratisByteLimit) {
+ LOG.error(
+ "Single revoked STS Token size ({}) would exceed the
ratisByteLimit ({}). SessionToken " +
+ "initialCreationTimeMillis: {}", batchWithCandidateSize,
ratisByteLimit, initialCreationTimeMillis);
+ continue;
+ }
+ }
+ batch.add(sessionToken);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failure while scanning s3RevokedStsTokenTable. It will be
retried in the next interval", e);
+ if (deletedInRun == 0) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ }
+
+ // Submit any remaining tokens
+ if (!batch.isEmpty()) {
+ if (submitCleanupRequest(batch)) {
+ deletedInRun += batch.size();
+ } else {
+ LOG.warn("Failed to submit final batch of {} revoked tokens.",
batch.size());
+ }
+ }
+
+ // Update stats
+ if (deletedInRun > 0) {
+ submittedDeletedEntryCount.addAndGet(deletedInRun);
+ LOG.info("Found and removed {} revoked STS token entries.",
deletedInRun);
+ }
+
+ final long elapsed = Time.monotonicNow() - startTime;
+ LOG.info("RevokedSTSTokenCleanupService run completed.
deletedEntriesInRun={}, totalDeletedEntries={}, " +
+ "callIdCount={}, elapsedTimeMs={}", deletedInRun,
submittedDeletedEntryCount.get(), callIdCount.get(),
+ elapsed);
+
+ final long resultCount = deletedInRun;
+ return () -> (int) resultCount;
+ }
+
+ /**
+ * Returns true if the given STS session token has been in the table past
the cleanup threshold.
+ */
+ private boolean shouldCleanup(long initialCreationTimeMillis) {
+ final long now = CLOCK.millis();
+
+ if (now - initialCreationTimeMillis > CLEANUP_THRESHOLD) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Revoked STS token entry created at {} is older than 12 hours,
will clean up. Current time: {}",
+ initialCreationTimeMillis, now);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Builds and submits an OMRequest to delete the provided revoked STS
token(s).
+ */
+ private boolean submitCleanupRequest(List<String> sessionTokens) {
+ final DeleteRevokedSTSTokensRequest request =
DeleteRevokedSTSTokensRequest.newBuilder()
+ .addAllSessionToken(sessionTokens)
+ .build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.DeleteRevokedSTSTokens)
+ .setDeleteRevokedSTSTokensRequest(request)
+ .setClientId(clientId.toString())
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .build();
+
+ try {
+ final OMResponse omResponse = OzoneManagerRatisUtils.submitRequest(
+ ozoneManager, omRequest, clientId, callIdCount.incrementAndGet());
+ return omResponse != null && omResponse.getSuccess();
+ } catch (ServiceException e) {
+ LOG.error("Revoked STS token cleanup request failed. Will retry at
next run.", e);
+ return false;
+ }
+ }
+
+ private int getBatchSerializedSize(List<String> sessionTokenBatch) {
+ final DeleteRevokedSTSTokensRequest request =
DeleteRevokedSTSTokensRequest.newBuilder()
+ .addAllSessionToken(sessionTokenBatch)
+ .build();
+
+ return request.getSerializedSize();
+ }
+ }
+}
+
+
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestRevokedSTSTokenCleanupService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestRevokedSTSTokenCleanupService.java
new file mode 100644
index 00000000000..1cf459d0324
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestRevokedSTSTokenCleanupService.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ServiceException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.db.StringInMemoryTestTable;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteRevokedSTSTokensRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ozone.test.TestClock;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+/**
+ * Unit tests for {@link RevokedSTSTokenCleanupService}.
+ */
+public class TestRevokedSTSTokenCleanupService {
+ private OzoneManager ozoneManager;
+ private StringInMemoryTestTable<Long> revokedStsTokenTable;
+ private TestClock testClock;
+ private OzoneConfiguration ozoneConfiguration;
+
+ @BeforeEach
+ public void setUp() {
+ testClock = TestClock.newInstance();
+ ozoneManager = mock(OzoneManager.class);
+ ozoneConfiguration = new OzoneConfiguration();
+ final OMMetadataManager omMetadataManager = mock(OMMetadataManager.class);
+ revokedStsTokenTable = new StringInMemoryTestTable<>();
+
+ when(ozoneManager.isLeaderReady()).thenReturn(true);
+ when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+ when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
+ when(ozoneManager.getThreadNamePrefix()).thenReturn("om-");
+
when(omMetadataManager.getS3RevokedStsTokenTable()).thenReturn(revokedStsTokenTable);
+ }
+
+ @Test
+ public void submitsCleanupRequestForOnlyExpiredTokens() throws Exception {
+ // If there are two revoked entries, one expired and one not expired, only
the expired session token should be
+ // submitted for cleanup.
+ final long nowMillis = testClock.millis();
+ final long expiredCreationTimeMillis = nowMillis -
TimeUnit.HOURS.toMillis(13); // older than 12h threshold
+ final long validCreationTimeMillis = nowMillis -
TimeUnit.HOURS.toMillis(1);
+ revokedStsTokenTable.put("session-token-a", expiredCreationTimeMillis);
+ revokedStsTokenTable.put("session-token-b", validCreationTimeMillis);
+
+ final AtomicReference<OMRequest> capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(1);
+
+ final OMRequest omRequest = capturedRequest.get();
+ assertThat(omRequest).isNotNull();
+
assertThat(omRequest.getCmdType()).isEqualTo(Type.DeleteRevokedSTSTokens);
+
+ final DeleteRevokedSTSTokensRequest deleteRevokedSTSTokensRequest =
+ omRequest.getDeleteRevokedSTSTokensRequest();
+
assertThat(deleteRevokedSTSTokensRequest.getSessionTokenList()).containsExactly("session-token-a");
+ }
+ }
+
+ @Test
+ public void doesNotSubmitRequestWhenThereAreNoExpiredTokens() throws
Exception {
+ // If only non-expired entries exist in the revoked sts token table, no
cleanup request should be submitted and
+ // no metrics should be updated.
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("session-token-c", nowMillis -
TimeUnit.HOURS.toMillis(1));
+ revokedStsTokenTable.put("session-token-d", nowMillis -
TimeUnit.HOURS.toMillis(2));
+
+ final AtomicReference<OMRequest> capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ assertThat(capturedRequest.get()).isNull();
+ }
+ }
+
+ @Test
+ public void handlesNoEntriesInRevokedSTSTokenTable() throws Exception {
+ // If the table is empty (which most of the time it will be), no cleanup
requests should be submitted and no metrics
+ // should be updated.
+ final AtomicReference<OMRequest> capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ assertThat(capturedRequest.get()).isNull();
+ }
+ }
+
+ @Test
+ public void doesNotUpdateMetricsOnRatisSubmissionServiceExceptionFailure()
throws Exception {
+ // If there are expired tokens in the table but the OM request submission
to clean up the entries fails with a
+ // service exception, the metrics should not be updated
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("session-token-e", nowMillis -
TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("session-token-f", nowMillis -
TimeUnit.HOURS.toMillis(14));
+
+ final AtomicInteger submitAttempts = new AtomicInteger(0);
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ // Simulate Ratis submission failure
+ mockRatisSubmitToFail(ozoneManagerRatisUtilsMock, submitAttempts);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(submitAttempts.get()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ }
+ }
+
+ @Test
+ public void doesNotUpdateMetricsOnNonSuccessfulResponse() throws Exception {
+ // If there is an expired token in the table but the OM request submission
to clean up the entries gets a
+ // non-successful response, the metrics should not be updated
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("session-token-f", nowMillis -
TimeUnit.HOURS.toMillis(20));
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ // Return a non-successful response
+ mockRatisSubmitWithInternalErrorResponse(ozoneManagerRatisUtilsMock);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ }
+ }
+
+ @Test
+ public void handlesAllExpiredTokens() throws Exception {
+ // If all the tokens in the table are expired on a particular run, ensure
the metrics are updated appropriately
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("session-token-g", nowMillis -
TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("session-token-h", nowMillis -
TimeUnit.HOURS.toMillis(14));
+ revokedStsTokenTable.put("session-token-i", nowMillis -
TimeUnit.HOURS.toMillis(15));
+
+ final AtomicReference<OMRequest> capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(3);
+
+ final OMRequest omRequest = capturedRequest.get();
+ assertThat(omRequest).isNotNull();
+
assertThat(omRequest.getCmdType()).isEqualTo(Type.DeleteRevokedSTSTokens);
+
+ final DeleteRevokedSTSTokensRequest deleteRevokedSTSTokensRequest =
+ omRequest.getDeleteRevokedSTSTokensRequest();
+ assertThat(deleteRevokedSTSTokensRequest.getSessionTokenList())
+ .containsExactlyInAnyOrder("session-token-g", "session-token-h",
"session-token-i");
+ }
+ }
+
+ @Test
+ public void submitsMultipleRequestsWhenBatchSizeIsExceeded() throws
Exception {
+ // If the tokens exceed the configured batch size, multiple requests
should be submitted
+ final long nowMillis = testClock.millis();
+
+ // Create 10 expired tokens
+ for (int i = 0; i < 10; i++) {
+ revokedStsTokenTable.put("session-token-" + i, nowMillis -
TimeUnit.HOURS.toMillis(13));
+ }
+
+ // Set a very small ratisByteLimit (100 bytes) to force batching. A single
token request will be small, but 10
+ // will exceed this. The effective limit will be 90 bytes (90% of 100).
+ ozoneConfiguration.setStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, 100,
StorageUnit.BYTES);
+
+ final List<OMRequest> capturedRequests = new ArrayList<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCaptureRequests(ozoneManagerRatisUtilsMock,
capturedRequests);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ // There should be multiple requests
+ assertThat(capturedRequests.size()).isEqualTo(2);
+
+ // Verify all tokens were included across the requests
+ final int totalTokens = capturedRequests.stream()
+ .mapToInt(r ->
r.getDeleteRevokedSTSTokensRequest().getSessionTokenList().size())
+ .sum();
+ assertThat(totalTokens).isEqualTo(10);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(10);
+ }
+ }
+
+ @Test
+ public void testSingleOversizedExpiredTokenAndItIsTheOnlyExpiredToken()
throws Exception {
+ // One sessionToken is larger than the ratisByteLimit, and it is the only
expired token
+ final long nowMillis = testClock.millis();
+ // Serialized size for largeToken is 102 > 90 (the effective
ratisByteLimit) .
+ final String largeToken = new String(new char[100]).replace('\0', 'a');
+ revokedStsTokenTable.put(largeToken, nowMillis -
TimeUnit.HOURS.toMillis(13));
+
+ ozoneConfiguration.setStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, 100,
StorageUnit.BYTES);
+
+ final List<OMRequest> capturedRequests = new ArrayList<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCaptureRequests(ozoneManagerRatisUtilsMock,
capturedRequests);
+
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ // Single token exceeding ratisByteLimit is skipped
+ assertThat(capturedRequests).isEmpty();
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ }
+ }
+
+ @Test
+ public void
testSingleOversizedExpiredTokenAndThereAreMultipleExpiredTokens() throws
Exception {
+ // One sessionToken is larger than the ratisByteLimit, and it is not the
only expired token
+ final long nowMillis = testClock.millis();
+ final String smallToken = "session-token-j";
+ final String largeToken = "session-token-k-" + new String(new
char[90]).replace('\0', 'a'); // > 90 bytes
+
+ revokedStsTokenTable.put(smallToken, nowMillis -
TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put(largeToken, nowMillis -
TimeUnit.HOURS.toMillis(13));
+
+ ozoneConfiguration.setStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, 100,
StorageUnit.BYTES);
+
+ final List<OMRequest> capturedRequests = new ArrayList<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCaptureRequests(ozoneManagerRatisUtilsMock,
capturedRequests);
+
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(capturedRequests).hasSize(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void testExpiredAndNonExpiredTokensWithSmallRatisByteLimit() throws
Exception {
+ // Expired and non-expired entries with ratisByteLimit of 100
+ final long nowMillis = testClock.millis();
+
+ revokedStsTokenTable.put("session-token-l", nowMillis -
TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("session-token-m", nowMillis -
TimeUnit.HOURS.toMillis(1)); // Should be skipped
+ revokedStsTokenTable.put("session-token-n", nowMillis -
TimeUnit.HOURS.toMillis(13));
+
+ ozoneConfiguration.setStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, 100,
StorageUnit.BYTES);
+
+ final List<OMRequest> capturedRequests = new ArrayList<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCaptureRequests(ozoneManagerRatisUtilsMock,
capturedRequests);
+
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ // session-token-l and session-token-n fit in one batch.
session-token-m is ignored because it is not expired.
+ assertThat(capturedRequests).hasSize(1);
+
assertThat(capturedRequests.get(0).getDeleteRevokedSTSTokensRequest().getSessionTokenList())
+ .containsExactly("session-token-l", "session-token-n");
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(2);
+ }
+ }
+
+ @Test
+ public void testExpiredTokenMatchesRatisByteLimitExactly() throws Exception {
+ // Force small batch of 100 bytes and test when the batch size is exactly
ratisByteLimit
+ final long nowMillis = testClock.millis();
+ final String tokenMatchingRatisByteLimitWhenSerialized = new String(new
char[88]).replace('\0', 'a');
+
+ revokedStsTokenTable.put(tokenMatchingRatisByteLimitWhenSerialized,
nowMillis - TimeUnit.HOURS.toMillis(13));
+
+ ozoneConfiguration.setStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, 100,
StorageUnit.BYTES);
+
+ final List<OMRequest> capturedRequests = new ArrayList<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCaptureRequests(ozoneManagerRatisUtilsMock,
capturedRequests);
+
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(capturedRequests).hasSize(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void testCallIdCountIncreasesAcrossBatches() throws Exception {
+ // Force small batch of 40 bytes (which should trigger multiple calls to
OzoneManagerRatisUtils.submitRequest)
+ // and ensure the callIdCount increases across each batch
+ // session-token-1 and session-token-2 are in first batch, and
session-token-3 is in second batch.
+ final long nowMillis = testClock.millis();
+
+ revokedStsTokenTable.put("session-token-1", nowMillis -
TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("session-token-2", nowMillis -
TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("session-token-3", nowMillis -
TimeUnit.HOURS.toMillis(13));
+
+
ozoneConfiguration.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
40, StorageUnit.BYTES);
+
+ final List<Long> capturedCallIdCounts = new ArrayList<>();
+
+ try (MockedStatic<OzoneManagerRatisUtils> ozoneManagerRatisUtilsMock =
mockStatic(OzoneManagerRatisUtils.class)) {
+ // Capture the callIdCount (4th argument)
+ ozoneManagerRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(),
anyLong()))
+ .thenAnswer(invocation -> {
+ capturedCallIdCounts.add(invocation.getArgument(3));
+ return buildOkResponse(invocation.getArgument(1));
+ });
+
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+
assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(3);
+ assertThat(capturedCallIdCounts).hasSize(2);
+
assertThat(capturedCallIdCounts.get(1)).isGreaterThan(capturedCallIdCounts.get(0));
+ }
+ }
+
+ private RevokedSTSTokenCleanupService createAndRunCleanupService() throws
Exception {
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
+ new RevokedSTSTokenCleanupService(1, TimeUnit.HOURS, 1_000,
ozoneManager);
+ revokedSTSTokenCleanupService.runPeriodicalTaskNow();
+ return revokedSTSTokenCleanupService;
+ }
+
+ private void mockRatisSubmitAndCapture(MockedStatic<OzoneManagerRatisUtils>
ozoneManagerRatisUtilsMock,
+ AtomicReference<OMRequest> capturedRequest) {
+ mockRatisSubmit(ozoneManagerRatisUtilsMock, capturedRequest::set);
+ }
+
+ private void
mockRatisSubmitAndCaptureRequests(MockedStatic<OzoneManagerRatisUtils>
ozoneManagerRatisUtilsMock,
+ List<OMRequest> capturedRequests) {
+ mockRatisSubmit(ozoneManagerRatisUtilsMock, capturedRequests::add);
+ }
+
+ private void mockRatisSubmitToFail(MockedStatic<OzoneManagerRatisUtils>
ozoneManagerRatisUtilsMock,
+ AtomicInteger submitAttempts) {
+ ozoneManagerRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(),
anyLong()))
+ .thenAnswer(invocation -> {
+ submitAttempts.incrementAndGet();
+ throw new ServiceException("Simulated Ratis failure");
+ });
+ }
+
+ private void
mockRatisSubmitWithInternalErrorResponse(MockedStatic<OzoneManagerRatisUtils>
omRatisUtilsMock) {
+ omRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(),
anyLong()))
+ .thenReturn(OMResponse.newBuilder()
+ .setCmdType(Type.DeleteRevokedSTSTokens)
+ .setStatus(Status.INTERNAL_ERROR)
+ .setSuccess(false)
+ .build());
+ }
+
+ private static OMResponse buildOkResponse(OMRequest omRequest) {
+ return OMResponse.newBuilder()
+ .setCmdType(omRequest.getCmdType())
+ .setStatus(OK)
+ .setSuccess(true)
+ .build();
+ }
+
+ private void mockRatisSubmit(MockedStatic<OzoneManagerRatisUtils>
ozoneManagerRatisUtilsMock,
+ Consumer<OMRequest> requestConsumer) {
+ ozoneManagerRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(),
anyLong()))
+ .thenAnswer(invocation -> {
+ final OMRequest omRequest = invocation.getArgument(1);
+ requestConsumer.accept(omRequest);
+ return buildOkResponse(omRequest);
+ });
+ }
+}
+
+
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/package-info.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/package-info.java
new file mode 100644
index 00000000000..71dfeed880d
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for OM services.
+ */
+package org.apache.hadoop.ozone.om.service;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]