This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b5877bebe04 HDDS-14398. Create ObjectOperationHandler interface (#9683)
b5877bebe04 is described below
commit b5877bebe04cd16ac756fa709d7d297f3ab82f0b
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri Jan 30 10:10:01 2026 +0100
HDDS-14398. Create ObjectOperationHandler interface (#9683)
---
.../endpoint/AuditingObjectOperationHandler.java | 94 ++++++++
.../hadoop/ozone/s3/endpoint/EndpointBase.java | 7 +-
.../hadoop/ozone/s3/endpoint/ObjectAclHandler.java | 61 ++++++
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 238 ++++++++++++---------
.../ozone/s3/endpoint/ObjectOperationHandler.java | 50 +++++
.../s3/endpoint/ObjectOperationHandlerChain.java | 107 +++++++++
.../ozone/s3/endpoint/ObjectTaggingHandler.java | 84 ++++++++
.../hadoop/ozone/s3/endpoint/EndpointBuilder.java | 4 +
.../ozone/s3/endpoint/EndpointTestUtils.java | 24 ++-
.../hadoop/ozone/s3/endpoint/TestObjectPut.java | 17 +-
.../hadoop/ozone/s3/endpoint/TestPartUpload.java | 7 +-
11 files changed, 568 insertions(+), 125 deletions(-)
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/AuditingObjectOperationHandler.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/AuditingObjectOperationHandler.java
new file mode 100644
index 00000000000..0f5469cc4dd
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/AuditingObjectOperationHandler.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint.ObjectRequestContext;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+
+/** Performs audit logging for {@link ObjectOperationHandler}s. */
+class AuditingObjectOperationHandler extends ObjectOperationHandler {
+
+ private final ObjectOperationHandler delegate;
+
+ AuditingObjectOperationHandler(ObjectOperationHandler delegate) {
+ this.delegate = delegate;
+ copyDependenciesFrom(delegate);
+ }
+
+ @Override
+ Response handleDeleteRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ try {
+ verifyBucketOwner(context);
+ Response response = delegate.handleDeleteRequest(context, keyName);
+ auditWriteSuccess(context.getAction());
+ return response;
+ } catch (Exception e) {
+ auditWriteFailure(context.getAction(), e);
+ throw e;
+ }
+ }
+
+ @Override
+ Response handleGetRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ try {
+ verifyBucketOwner(context);
+ Response response = delegate.handleGetRequest(context, keyName);
+ auditReadSuccess(context.getAction(), context.getPerf());
+ return response;
+ } catch (Exception e) {
+ auditReadFailure(context.getAction(), e);
+ throw e;
+ }
+ }
+
+ @Override
+ Response handleHeadRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ try {
+ verifyBucketOwner(context);
+ Response response = delegate.handleHeadRequest(context, keyName);
+ auditReadSuccess(context.getAction());
+ return response;
+ } catch (Exception e) {
+ auditReadFailure(context.getAction(), e);
+ throw e;
+ }
+ }
+
+ @Override
+ Response handlePutRequest(ObjectRequestContext context, String keyName,
InputStream body)
+ throws IOException, OS3Exception {
+ try {
+ verifyBucketOwner(context);
+ Response response = delegate.handlePutRequest(context, keyName, body);
+ auditWriteSuccess(context.getAction(), context.getPerf());
+ return response;
+ } catch (Exception e) {
+ auditWriteFailure(context.getAction(), e);
+ throw e;
+ }
+ }
+
+ private void verifyBucketOwner(ObjectRequestContext context) throws
IOException {
+ if (S3Owner.hasBucketOwnershipVerificationConditions(getHeaders())) {
+ S3Owner.verifyBucketOwnerCondition(getHeaders(),
context.getBucketName(), context.getBucket().getOwner());
+ }
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index e5bd02e6805..37e91ef74cd 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -623,6 +623,9 @@ void setOzoneConfiguration(OzoneConfiguration conf) {
* Used for initializing handler instances.
*/
protected void copyDependenciesTo(EndpointBase target) {
+ if (this == target) {
+ return;
+ }
target.queryParams = queryParams;
target.s3Auth = s3Auth;
target.setClient(this.client);
@@ -820,11 +823,11 @@ public int getChunkSize() {
return chunkSize;
}
- public MessageDigest getMD5DigestInstance() {
+ public static MessageDigest getMD5DigestInstance() {
return MD5_PROVIDER.get();
}
- public MessageDigest getSha256DigestInstance() {
+ public static MessageDigest getSha256DigestInstance() {
return SHA_256_PROVIDER.get();
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectAclHandler.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectAclHandler.java
new file mode 100644
index 00000000000..263c093970a
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectAclHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NOT_IMPLEMENTED;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.ozone.audit.S3GAction;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint.ObjectRequestContext;
+import org.apache.hadoop.ozone.s3.util.S3Consts;
+
+/** Not implemented yet. */
+class ObjectAclHandler extends ObjectOperationHandler {
+
+ @Override
+ Response handlePutRequest(ObjectRequestContext context, String keyName,
InputStream body) throws IOException {
+ if (context.ignore(getAction())) {
+ return null;
+ }
+
+ try {
+ throw newError(NOT_IMPLEMENTED, keyName);
+ } catch (Exception e) {
+ getMetrics().updatePutObjectAclFailureStats(context.getStartNanos());
+ throw e;
+ }
+ }
+
+ @SuppressWarnings("SwitchStatementWithTooFewBranches")
+ S3GAction getAction() {
+ if (queryParams().get(S3Consts.QueryParams.ACL) == null) {
+ return null;
+ }
+
+ switch (getContext().getMethod()) {
+ case HttpMethod.PUT:
+ return S3GAction.PUT_OBJECT_ACL;
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index e255378bfa3..d6d9b101c52 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -24,7 +24,6 @@
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
-import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NOT_IMPLEMENTED;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.PRECOND_FAILED;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
@@ -48,6 +47,7 @@
import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes;
import com.google.common.collect.ImmutableMap;
+import jakarta.annotation.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -65,7 +65,6 @@
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
-import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
@@ -102,7 +101,6 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.s3.HeaderPreprocessor;
import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
-import org.apache.hadoop.ozone.s3.endpoint.S3Tagging.Tag;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.util.RFC1123Util;
@@ -122,7 +120,7 @@
* Key level rest endpoints.
*/
@Path("/{bucket}/{path:.+}")
-public class ObjectEndpoint extends EndpointBase {
+public class ObjectEndpoint extends ObjectOperationHandler {
private static final String BUCKET = "bucket";
private static final String PATH = "path";
@@ -130,6 +128,8 @@ public class ObjectEndpoint extends EndpointBase {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectEndpoint.class);
+ private ObjectOperationHandler handler;
+
/*FOR the feature Overriding Response Header
https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
private final Map<String, String> overrideQueryParameter;
@@ -145,48 +145,84 @@ public ObjectEndpoint() {
.build();
}
+ @Override
+ protected void init() {
+ super.init();
+ ObjectOperationHandler chain = ObjectOperationHandlerChain.newBuilder(this)
+ .add(new ObjectAclHandler())
+ .add(new ObjectTaggingHandler())
+ .add(this)
+ .build();
+ handler = new AuditingObjectOperationHandler(chain);
+ }
+
/**
* Rest endpoint to upload object to a bucket.
* <p>
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
for
* more details.
*/
- @SuppressWarnings("checkstyle:MethodLength")
@PUT
public Response put(
@PathParam(BUCKET) String bucketName,
@PathParam(PATH) String keyPath,
- @HeaderParam(HttpHeaders.CONTENT_LENGTH) long length,
final InputStream body
) throws IOException, OS3Exception {
- final String aclMarker = queryParams().get(QueryParams.ACL);
- final String taggingMarker = queryParams().get(QueryParams.TAGGING);
+ ObjectRequestContext context = new
ObjectRequestContext(S3GAction.CREATE_KEY, bucketName);
+ try {
+ return handler.handlePutRequest(context, keyPath, body);
+ } catch (OMException ex) {
+ if (ex.getResult() == ResultCodes.NOT_A_FILE) {
+ OS3Exception os3Exception = newError(INVALID_REQUEST, keyPath, ex);
+ os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
+ "when calling the PutObject/MPU PartUpload operation: " +
+ OmConfig.Keys.ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
+ " considered as Unix Paths. Path has Violated FS Semantics " +
+ "which caused put operation to fail.");
+ throw os3Exception;
+ } else if (isAccessDenied(ex)) {
+ throw newError(S3ErrorTable.ACCESS_DENIED, keyPath, ex);
+ } else if (ex.getResult() == ResultCodes.QUOTA_EXCEEDED) {
+ throw newError(S3ErrorTable.QUOTA_EXCEEDED, keyPath, ex);
+ } else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
+ throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
+ } else if (ex.getResult() == ResultCodes.FILE_ALREADY_EXISTS) {
+ throw newError(S3ErrorTable.NO_OVERWRITE, keyPath, ex);
+ } else if (ex.getResult() == ResultCodes.INVALID_REQUEST) {
+ throw newError(S3ErrorTable.INVALID_REQUEST, keyPath);
+ } else if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
+ throw newError(S3ErrorTable.NO_SUCH_KEY, keyPath);
+ } else if (ex.getResult() == ResultCodes.NOT_SUPPORTED_OPERATION) {
+ // e.g. if putObjectTagging operation is applied on FSO directory
+ throw newError(S3ErrorTable.NOT_IMPLEMENTED, keyPath);
+ }
+
+ throw ex;
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:MethodLength")
+ Response handlePutRequest(ObjectRequestContext context, String keyPath,
InputStream body) throws IOException {
final String uploadID = queryParams().get(QueryParams.UPLOAD_ID);
- long startNanos = Time.monotonicNowNanos();
- S3GAction s3GAction = S3GAction.CREATE_KEY;
- boolean auditSuccess = true;
- PerformanceStringBuilder perf = new PerformanceStringBuilder();
+
+ final String bucketName = context.getBucketName();
+ final PerformanceStringBuilder perf = context.getPerf();
+ final long startNanos = context.getStartNanos();
String copyHeader = null;
MultiDigestInputStream multiDigestInputStream = null;
try {
- if (aclMarker != null) {
- s3GAction = S3GAction.PUT_OBJECT_ACL;
- throw newError(NOT_IMPLEMENTED, keyPath);
- }
- OzoneVolume volume = getVolume();
- OzoneBucket bucket = volume.getBucket(bucketName);
- S3Owner.verifyBucketOwnerCondition(getHeaders(), bucketName,
bucket.getOwner());
- if (taggingMarker != null) {
- s3GAction = S3GAction.PUT_OBJECT_TAGGING;
- return putObjectTagging(bucket, keyPath, body);
- }
+ OzoneVolume volume = context.getVolume();
+ OzoneBucket bucket = context.getBucket();
+ final String lengthHeader =
getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH);
+ long length = lengthHeader != null ? Long.parseLong(lengthHeader) : 0;
if (uploadID != null && !uploadID.equals("")) {
if (getHeaders().getHeaderString(COPY_SOURCE_HEADER) == null) {
- s3GAction = S3GAction.CREATE_MULTIPART_KEY;
+ context.setAction(S3GAction.CREATE_MULTIPART_KEY);
} else {
- s3GAction = S3GAction.CREATE_MULTIPART_KEY_BY_COPY;
+ context.setAction(S3GAction.CREATE_MULTIPART_KEY_BY_COPY);
}
// If uploadID is specified, it is a request for upload part
return createMultipartKey(volume, bucket, keyPath, length,
@@ -207,7 +243,7 @@ public Response put(
if (copyHeader != null) {
//Copy object, as copy source available.
- s3GAction = S3GAction.COPY_OBJECT;
+ context.setAction(S3GAction.COPY_OBJECT);
CopyObjectResponse copyObjectResponse = copyObject(volume,
bucketName, keyPath, replicationConfig, perf);
return Response.status(Status.OK).entity(copyObjectResponse).header(
@@ -227,7 +263,7 @@ public Response put(
(length == 0 || hasAmzDecodedLengthZero) &&
StringUtils.endsWith(keyPath, "/")
) {
- s3GAction = S3GAction.CREATE_DIRECTORY;
+ context.setAction(S3GAction.CREATE_DIRECTORY);
getClientProtocol()
.createDirectory(volume.getName(), bucketName, keyPath);
long metadataLatencyNs =
@@ -298,46 +334,14 @@ public Response put(
}
getMetrics().incPutKeySuccessLength(putLength);
perf.appendSizeBytes(putLength);
+ long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos);
+ perf.appendOpLatencyNanos(opLatencyNs);
return Response.ok()
.header(HttpHeaders.ETAG, wrapInQuotes(md5Hash))
.status(HttpStatus.SC_OK)
.build();
- } catch (OMException ex) {
- auditSuccess = false;
- auditWriteFailure(s3GAction, ex);
- if (taggingMarker != null) {
- getMetrics().updatePutObjectTaggingFailureStats(startNanos);
- } else if (copyHeader != null) {
- getMetrics().updateCopyObjectFailureStats(startNanos);
- } else {
- getMetrics().updateCreateKeyFailureStats(startNanos);
- }
- if (ex.getResult() == ResultCodes.NOT_A_FILE) {
- OS3Exception os3Exception = newError(INVALID_REQUEST, keyPath, ex);
- os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
- "when calling the PutObject/MPU PartUpload operation: " +
- OmConfig.Keys.ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
- " considered as Unix Paths. Path has Violated FS Semantics " +
- "which caused put operation to fail.");
- throw os3Exception;
- } else if (isAccessDenied(ex)) {
- throw newError(S3ErrorTable.ACCESS_DENIED, keyPath, ex);
- } else if (ex.getResult() == ResultCodes.QUOTA_EXCEEDED) {
- throw newError(S3ErrorTable.QUOTA_EXCEEDED, keyPath, ex);
- } else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
- throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
- } else if (ex.getResult() == ResultCodes.FILE_ALREADY_EXISTS) {
- throw newError(S3ErrorTable.NO_OVERWRITE, keyPath, ex);
- }
- throw ex;
- } catch (Exception ex) {
- auditSuccess = false;
- auditWriteFailure(s3GAction, ex);
- if (aclMarker != null) {
- getMetrics().updatePutObjectAclFailureStats(startNanos);
- } else if (taggingMarker != null) {
- getMetrics().updatePutObjectTaggingFailureStats(startNanos);
- } else if (copyHeader != null) {
+ } catch (IOException | RuntimeException ex) {
+ if (copyHeader != null) {
getMetrics().updateCopyObjectFailureStats(startNanos);
} else {
getMetrics().updateCreateKeyFailureStats(startNanos);
@@ -349,11 +353,6 @@ public Response put(
if (multiDigestInputStream != null) {
multiDigestInputStream.resetDigests();
}
- if (auditSuccess) {
- long opLatencyNs =
getMetrics().updateCreateKeySuccessStats(startNanos);
- perf.appendOpLatencyNanos(opLatencyNs);
- auditWriteSuccess(s3GAction, perf);
- }
}
}
@@ -1275,44 +1274,6 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
}
}
- private Response putObjectTagging(OzoneBucket bucket, String keyName,
InputStream body)
- throws IOException, OS3Exception {
- long startNanos = Time.monotonicNowNanos();
- S3Tagging tagging = null;
- try {
- tagging = new PutTaggingUnmarshaller().readFrom(body);
- tagging.validate();
- } catch (Exception ex) {
- OS3Exception exception =
S3ErrorTable.newError(S3ErrorTable.MALFORMED_XML, keyName);
- exception.setErrorMessage(exception.getErrorMessage() + ". " +
ex.getMessage());
- throw exception;
- }
-
- Map<String, String> tags = validateAndGetTagging(
- tagging.getTagSet().getTags(), // Nullity check was done in previous
parsing step
- Tag::getKey,
- Tag::getValue
- );
-
- try {
- bucket.putObjectTagging(keyName, tags);
- } catch (OMException ex) {
- if (ex.getResult() == ResultCodes.INVALID_REQUEST) {
- throw S3ErrorTable.newError(S3ErrorTable.INVALID_REQUEST, keyName);
- } else if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
- throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED, keyName);
- } else if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
- throw S3ErrorTable.newError(S3ErrorTable.NO_SUCH_KEY, keyName);
- } else if (ex.getResult() == ResultCodes.NOT_SUPPORTED_OPERATION) {
- // When putObjectTagging operation is applied on FSO directory
- throw S3ErrorTable.newError(S3ErrorTable.NOT_IMPLEMENTED, keyName);
- }
- throw ex;
- }
- getMetrics().updatePutObjectTaggingSuccessStats(startNanos);
- return Response.ok().build();
- }
-
private Response getObjectTagging(OzoneBucket bucket, String keyName) throws
IOException {
long startNanos = Time.monotonicNowNanos();
@@ -1340,4 +1301,69 @@ private Response deleteObjectTagging(OzoneVolume volume,
String bucketName, Stri
getMetrics().updateDeleteObjectTaggingSuccessStats(startNanos);
return Response.noContent().build();
}
+
+ /** Request context shared among {@code ObjectOperationHandler}s. */
+ final class ObjectRequestContext {
+ private final String bucketName;
+ private final long startNanos;
+ private final PerformanceStringBuilder perf;
+ private S3GAction action;
+ private OzoneVolume volume;
+ private OzoneBucket bucket;
+
+ /** @param action best guess on action based on request method, may be
refined later by handlers */
+ ObjectRequestContext(S3GAction action, String bucketName) {
+ this.action = action;
+ this.bucketName = bucketName;
+ this.startNanos = Time.monotonicNowNanos();
+ this.perf = new PerformanceStringBuilder();
+ }
+
+ long getStartNanos() {
+ return startNanos;
+ }
+
+ PerformanceStringBuilder getPerf() {
+ return perf;
+ }
+
+ String getBucketName() {
+ return bucketName;
+ }
+
+ OzoneVolume getVolume() throws IOException {
+ if (volume == null) {
+ volume = ObjectEndpoint.this.getVolume();
+ }
+ return volume;
+ }
+
+ OzoneBucket getBucket() throws IOException {
+ if (bucket == null) {
+ bucket = getVolume().getBucket(bucketName);
+ }
+ return bucket;
+ }
+
+ S3GAction getAction() {
+ return action;
+ }
+
+ void setAction(S3GAction action) {
+ this.action = action;
+ }
+
+ /**
+ * This method should be called by each handler with the {@code S3GAction}
decided based on request parameters,
+ * {@code null} if it does not handle the request. {@code action} is
stored, if not null, for use in audit logging.
+ * @param a action as determined by handler
+ * @return true if handler should ignore the request (i.e. if {@code null}
is passed) */
+ boolean ignore(@Nullable S3GAction a) {
+ final boolean ignore = a == null;
+ if (!ignore) {
+ setAction(a);
+ }
+ return ignore;
+ }
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectOperationHandler.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectOperationHandler.java
new file mode 100644
index 00000000000..82ddda475c3
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectOperationHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint.ObjectRequestContext;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+
+/** Interface for handling object operations using chain of responsibility
pattern. */
+abstract class ObjectOperationHandler extends EndpointBase {
+
+ Response handleDeleteRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ return null;
+ }
+
+ Response handleGetRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ return null;
+ }
+
+ Response handleHeadRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ return null;
+ }
+
+ Response handlePutRequest(ObjectRequestContext context, String keyName,
InputStream body)
+ throws IOException, OS3Exception {
+ return null;
+ }
+
+ ObjectOperationHandler copyDependenciesFrom(EndpointBase other) {
+ other.copyDependenciesTo(this);
+ return this;
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectOperationHandlerChain.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectOperationHandlerChain.java
new file mode 100644
index 00000000000..805d478353c
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectOperationHandlerChain.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint.ObjectRequestContext;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+
+/** Chain of responsibility for {@link ObjectOperationHandler}s. */
+final class ObjectOperationHandlerChain extends ObjectOperationHandler {
+
+ private final List<ObjectOperationHandler> handlers;
+
+ private ObjectOperationHandlerChain(List<ObjectOperationHandler> handlers) {
+ this.handlers = handlers;
+ }
+
+ @Override
+ Response handleDeleteRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ for (ObjectOperationHandler handler : handlers) {
+ Response response = handler.handleDeleteRequest(context, keyName);
+ if (response != null) {
+ return response;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ Response handleGetRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ for (ObjectOperationHandler handler : handlers) {
+ Response response = handler.handleGetRequest(context, keyName);
+ if (response != null) {
+ return response;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ Response handleHeadRequest(ObjectRequestContext context, String keyName)
throws IOException, OS3Exception {
+ for (ObjectOperationHandler handler : handlers) {
+ Response response = handler.handleHeadRequest(context, keyName);
+ if (response != null) {
+ return response;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ Response handlePutRequest(ObjectRequestContext context, String keyName,
InputStream body)
+ throws IOException, OS3Exception {
+ for (ObjectOperationHandler handler : handlers) {
+ Response response = handler.handlePutRequest(context, keyName, body);
+ if (response != null) {
+ return response;
+ }
+ }
+ return null;
+ }
+
+ static Builder newBuilder(ObjectEndpoint endpoint) {
+ return new Builder(endpoint);
+ }
+
+ /** Builds {@code ObjectOperationHandlerChain}. */
+ static final class Builder {
+ private final List<ObjectOperationHandler> handlers = new LinkedList<>();
+ private final ObjectEndpoint endpoint;
+
+ private Builder(ObjectEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ /** Append {@code handler} to the list of delegates. */
+ Builder add(ObjectOperationHandler handler) {
+ handlers.add(handler.copyDependenciesFrom(endpoint));
+ return this;
+ }
+
+ /** Create {@code ObjectOperationHandlerChain} with the list of delegates.
*/
+ ObjectOperationHandler build() {
+ return new ObjectOperationHandlerChain(handlers)
+ .copyDependenciesFrom(endpoint);
+ }
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectTaggingHandler.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectTaggingHandler.java
new file mode 100644
index 00000000000..fe6a57ab60d
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectTaggingHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.ozone.audit.S3GAction;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint.ObjectRequestContext;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.util.S3Consts;
+
+/** Handle requests for object tagging. */
+class ObjectTaggingHandler extends ObjectOperationHandler {
+
+ @Override
+ Response handlePutRequest(ObjectRequestContext context, String keyName,
InputStream body) throws IOException {
+ if (context.ignore(getAction())) {
+ return null;
+ }
+
+ try {
+ S3Tagging tagging;
+ try {
+ tagging = new PutTaggingUnmarshaller().readFrom(body);
+ tagging.validate();
+ } catch (Exception ex) {
+ OS3Exception exception =
S3ErrorTable.newError(S3ErrorTable.MALFORMED_XML, keyName);
+ exception.setErrorMessage(exception.getErrorMessage() + ". " +
ex.getMessage());
+ throw exception;
+ }
+
+ Map<String, String> tags = validateAndGetTagging(
+ tagging.getTagSet().getTags(), // Nullity check was done in previous
parsing step
+ S3Tagging.Tag::getKey,
+ S3Tagging.Tag::getValue
+ );
+
+ context.getBucket().putObjectTagging(keyName, tags);
+
+ getMetrics().updatePutObjectTaggingSuccessStats(context.getStartNanos());
+
+ return Response.ok().build();
+ } catch (Exception e) {
+ getMetrics().updatePutObjectTaggingFailureStats(context.getStartNanos());
+ throw e;
+ }
+ }
+
+ private S3GAction getAction() {
+ if (queryParams().get(S3Consts.QueryParams.TAGGING) == null) {
+ return null;
+ }
+
+ switch (getContext().getMethod()) {
+ case HttpMethod.DELETE:
+ return S3GAction.DELETE_OBJECT_TAGGING;
+ case HttpMethod.GET:
+ return S3GAction.GET_OBJECT_TAGGING;
+ case HttpMethod.PUT:
+ return S3GAction.PUT_OBJECT_TAGGING;
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBuilder.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBuilder.java
index 1323c13fc0c..d19390ef9e2 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBuilder.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBuilder.java
@@ -105,6 +105,10 @@ public T build() {
final OzoneConfiguration config = getConfig();
endpoint.setOzoneConfiguration(config != null ? config : new
OzoneConfiguration());
+ if (httpHeaders == null) {
+ httpHeaders = mock(HttpHeaders.class);
+ }
+
endpoint.setContext(requestContext);
endpoint.setHeaders(httpHeaders);
endpoint.setRequestIdentifier(identifier);
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
index c82a0772c93..6d3180438e8 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
@@ -21,10 +21,13 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
@@ -97,12 +100,14 @@ public static Response putTagging(
String content
) throws IOException, OS3Exception {
subject.queryParamsForTest().set(S3Consts.QueryParams.TAGGING, "");
+ when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT);
+ setLengthHeader(subject, content);
+
if (content == null) {
- return subject.put(bucket, key, 0, null);
+ return subject.put(bucket, key, null);
} else {
- final long length = content.length();
try (ByteArrayInputStream body = new
ByteArrayInputStream(content.getBytes(UTF_8))) {
- return subject.put(bucket, key, length, body);
+ return subject.put(bucket, key, body);
}
}
}
@@ -120,13 +125,14 @@ public static Response put(
subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID,
uploadID);
}
subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER,
partNumber);
+ when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT);
+ setLengthHeader(subject, content);
if (content == null) {
- return subject.put(bucket, key, 0, null);
+ return subject.put(bucket, key, null);
} else {
- final long length = content.length();
try (ByteArrayInputStream body = new
ByteArrayInputStream(content.getBytes(UTF_8))) {
- return subject.put(bucket, key, length, body);
+ return subject.put(bucket, key, body);
}
}
}
@@ -252,6 +258,12 @@ public static OS3Exception
assertErrorResponse(OS3Exception expected, CheckedSup
return actual;
}
+ private static void setLengthHeader(ObjectEndpoint subject, String content) {
+ final long length = content != null ? content.length() : 0;
+ when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
+ .thenReturn(String.valueOf(length));
+ }
+
private EndpointTestUtils() {
// no instances
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index 00007d17c50..c2456dd068f 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -248,12 +248,13 @@ void testPutObjectWithSignedChunks() throws Exception {
@Test
public void testPutObjectMessageDigestResetDuringException() {
MessageDigest messageDigest = mock(MessageDigest.class);
- try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
+ try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class);
+ MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
// For example, EOFException during put-object due to client cancelling
the operation before it completes
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class), anyLong(),
anyLong(), any(byte[].class)))
.thenThrow(IOException.class);
- when(objectEndpoint.getMD5DigestInstance()).thenReturn(messageDigest);
+
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
assertThrows(IOException.class, () -> putObject(CONTENT).close());
@@ -369,9 +370,11 @@ public void
testCopyObjectMessageDigestResetDuringException() throws Exception {
assertThat(keyDetails.getMetadata().get(OzoneConsts.ETAG)).isNotEmpty();
MessageDigest messageDigest = mock(MessageDigest.class);
- try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
+ try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class);
+ MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
// Add the mocked methods only during the copy request
- when(objectEndpoint.getMD5DigestInstance()).thenReturn(messageDigest);
+
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
+ endpoint.when(() ->
EndpointBase.parseSourceHeader(any())).thenCallRealMethod();
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class), anyLong(),
anyLong(), any(byte[].class)))
.thenThrow(IOException.class);
@@ -390,9 +393,8 @@ public void
testCopyObjectMessageDigestResetDuringException() throws Exception {
@Test
public void testCopyObjectWithTags() throws Exception {
// Put object in to source bucket
- HttpHeaders headersForPut = newMockHttpHeaders();
+ HttpHeaders headersForPut = objectEndpoint.getHeaders();
when(headersForPut.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag2=value2");
- objectEndpoint.setHeaders(headersForPut);
String sourceKeyName = "sourceKey";
@@ -405,10 +407,9 @@ public void testCopyObjectWithTags() throws Exception {
// Copy object without x-amz-tagging-directive (default to COPY)
String destKey = "key=value/2";
- HttpHeaders headersForCopy = newMockHttpHeaders();
+ HttpHeaders headersForCopy = objectEndpoint.getHeaders();
when(headersForCopy.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(sourceKeyName));
- objectEndpoint.setHeaders(headersForCopy);
assertSucceeds(() -> putObject(DEST_BUCKET_NAME, destKey));
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index 0f91ac6c0f3..4b4f97830d9 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -162,10 +162,11 @@ public void
testPartUploadMessageDigestResetDuringException() throws IOException
MessageDigest sha256Digest = mock(MessageDigest.class);
when(sha256Digest.getAlgorithm()).thenReturn("SHA-256");
try (MockedStatic<IOUtils> ioutils = mockStatic(IOUtils.class);
- MockedStatic<ObjectEndpointStreaming> streaming =
mockStatic(ObjectEndpointStreaming.class)) {
+ MockedStatic<ObjectEndpointStreaming> streaming =
mockStatic(ObjectEndpointStreaming.class);
+ MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
// Add the mocked methods only during part upload
- when(rest.getMD5DigestInstance()).thenReturn(messageDigest);
- when(rest.getSha256DigestInstance()).thenReturn(sha256Digest);
+
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
+
endpoint.when(EndpointBase::getSha256DigestInstance).thenReturn(sha256Digest);
if (enableDataStream) {
streaming.when(() -> ObjectEndpointStreaming.createMultipartKey(any(),
any(), anyLong(), anyInt(), any(),
anyInt(), any(), any(), any()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]