This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3dd7838b607 KAFKA-20639: Move EnvelopeUtils to server module (#22416)
3dd7838b607 is described below

commit 3dd7838b607255cc9a18484a75aaf33c6c59dbf3
Author: majialong <[email protected]>
AuthorDate: Mon Jun 1 17:37:25 2026 +0800

    KAFKA-20639: Move EnvelopeUtils to server module (#22416)
    
    Move `EnvelopeUtils` to the server module in Java and add unit tests.
    
    Reviewers: Mickael Maison <[email protected]>, Ken Huang 
<[email protected]>
---
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 .../main/scala/kafka/server/EnvelopeUtils.scala    | 143 -------------
 .../unit/kafka/network/RequestChannelTest.scala    |   2 +-
 .../org/apache/kafka/server/EnvelopeUtils.java     | 138 ++++++++++++
 .../org/apache/kafka/server/EnvelopeUtilsTest.java | 235 +++++++++++++++++++++
 5 files changed, 375 insertions(+), 145 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index ef82b0cfb84..a36672311fd 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -55,7 +55,7 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply, 
BrokerRegistrationReply,
 import org.apache.kafka.network.Request
 import org.apache.kafka.raft.RaftManager
 import org.apache.kafka.security.DelegationTokenManager
-import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
+import org.apache.kafka.server.{ApiVersionManager, EnvelopeUtils, ProcessRole}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
 import org.apache.kafka.server.quota.ControllerMutationQuota
diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala 
b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
deleted file mode 100644
index 0d364c7fcf6..00000000000
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.net.{InetAddress, UnknownHostException}
-import java.nio.ByteBuffer
-import org.apache.kafka.common.errors.{InvalidRequestException, 
PrincipalDeserializationException, UnsupportedVersionException}
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, 
RequestHeader}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.network.Request
-import org.apache.kafka.network.metrics.RequestChannelMetrics
-
-import java.util.Optional
-import scala.jdk.OptionConverters.RichOptional
-
-
-object EnvelopeUtils {
-  def handleEnvelopeRequest(
-    request: Request,
-    requestChannelMetrics: RequestChannelMetrics,
-    handler: Request => Unit
-  ): Unit = {
-    val envelope = request.body(classOf[EnvelopeRequest])
-    val forwardedPrincipal = parseForwardedPrincipal(request.context, 
envelope.requestPrincipal)
-    val forwardedClientAddress = 
parseForwardedClientAddress(envelope.clientAddress)
-
-    val forwardedRequestBuffer = envelope.requestData.duplicate()
-    val forwardedRequestHeader = 
parseForwardedRequestHeader(forwardedRequestBuffer)
-
-    val forwardedApi = forwardedRequestHeader.apiKey
-    if (!forwardedApi.forwardable) {
-      throw new InvalidRequestException(s"API $forwardedApi is not enabled or 
is not eligible for forwarding")
-    }
-
-    val forwardedContext = new RequestContext(
-      forwardedRequestHeader,
-      request.context.connectionId,
-      forwardedClientAddress,
-      forwardedPrincipal,
-      request.context.listenerName,
-      request.context.securityProtocol,
-      ClientInformation.EMPTY,
-      request.context.fromPrivilegedListener
-    )
-
-    val forwardedRequest = parseForwardedRequest(
-      request,
-      forwardedContext,
-      forwardedRequestBuffer,
-      requestChannelMetrics
-    )
-    handler(forwardedRequest)
-  }
-
-  private def parseForwardedClientAddress(
-    address: Array[Byte]
-  ): InetAddress = {
-    try {
-      InetAddress.getByAddress(address)
-    } catch {
-      case e: UnknownHostException =>
-        throw new InvalidRequestException("Failed to parse client address from 
envelope", e)
-    }
-  }
-
-  private def parseForwardedRequest(
-    envelope: Request,
-    forwardedContext: RequestContext,
-    buffer: ByteBuffer,
-    requestChannelMetrics: RequestChannelMetrics
-  ): Request = {
-    try {
-      val forwardedRequest = new Request(
-        envelope.processor,
-        forwardedContext,
-        envelope.startTimeNanos,
-        envelope.memoryPool,
-        buffer,
-        requestChannelMetrics,
-        Optional.of(envelope)
-      )
-      // set the dequeue time of forwardedRequest as the value of envelope 
request
-      
forwardedRequest.requestDequeueTimeNanos(envelope.requestDequeueTimeNanos)
-      forwardedRequest
-    } catch {
-      case e: InvalidRequestException =>
-        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
-        // The purpose is to disambiguate structural errors in the envelope 
request
-        // itself, such as an invalid client address.
-        throw new UnsupportedVersionException(s"Failed to parse forwarded 
request " +
-          s"with header ${forwardedContext.header}", e)
-    }
-  }
-
-  private def parseForwardedRequestHeader(
-    buffer: ByteBuffer
-  ): RequestHeader = {
-    try {
-      RequestHeader.parse(buffer)
-    } catch {
-      case e: InvalidRequestException =>
-        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
-        // The purpose is to disambiguate structural errors in the envelope 
request
-        // itself, such as an invalid client address.
-        throw new UnsupportedVersionException("Failed to parse request header 
from envelope", e)
-    }
-  }
-
-  private def parseForwardedPrincipal(
-    envelopeContext: RequestContext,
-    principalBytes: Array[Byte]
-  ): KafkaPrincipal = {
-    envelopeContext.principalSerde.toScala match {
-      case Some(serde) =>
-        try {
-          serde.deserialize(principalBytes)
-        } catch {
-          case e: Exception =>
-            throw new PrincipalDeserializationException("Failed to deserialize 
client principal from envelope", e)
-        }
-
-      case None =>
-        throw new PrincipalDeserializationException("Could not deserialize 
principal since " +
-          "no `KafkaPrincipalSerde` has been defined")
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 1c0cbe74fdb..f896ae2a7df 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -18,7 +18,6 @@
 package kafka.network
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import kafka.server.EnvelopeUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
SslConfigs, TopicConfig}
@@ -35,6 +34,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.utils.internals.SecurityUtils
 import org.apache.kafka.network.{Request, RequestConvertToJson}
 import org.apache.kafka.network.metrics.RequestChannelMetrics
+import org.apache.kafka.server.EnvelopeUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
diff --git a/server/src/main/java/org/apache/kafka/server/EnvelopeUtils.java 
b/server/src/main/java/org/apache/kafka/server/EnvelopeUtils.java
new file mode 100644
index 00000000000..21a4a00aed3
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/EnvelopeUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.PrincipalDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.network.Request;
+import org.apache.kafka.network.metrics.RequestChannelMetrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public final class EnvelopeUtils {
+    private EnvelopeUtils() {
+    }
+
+    public static void handleEnvelopeRequest(
+        Request request,
+        RequestChannelMetrics requestChannelMetrics,
+        Consumer<Request> handler
+    ) {
+        EnvelopeRequest envelope = request.body(EnvelopeRequest.class);
+        KafkaPrincipal forwardedPrincipal = 
parseForwardedPrincipal(request.context(), envelope.requestPrincipal());
+        InetAddress forwardedClientAddress = 
parseForwardedClientAddress(envelope.clientAddress());
+
+        ByteBuffer forwardedRequestBuffer = envelope.requestData().duplicate();
+        RequestHeader forwardedRequestHeader = 
parseForwardedRequestHeader(forwardedRequestBuffer);
+
+        ApiKeys forwardedApi = forwardedRequestHeader.apiKey();
+        if (!forwardedApi.forwardable) {
+            throw new InvalidRequestException("API " + forwardedApi + " is not 
enabled or is not eligible for forwarding");
+        }
+
+        RequestContext forwardedContext = new RequestContext(
+            forwardedRequestHeader,
+            request.context().connectionId,
+            forwardedClientAddress,
+            forwardedPrincipal,
+            request.context().listenerName,
+            request.context().securityProtocol,
+            ClientInformation.EMPTY,
+            request.context().fromPrivilegedListener
+        );
+
+        Request forwardedRequest = parseForwardedRequest(
+            request,
+            forwardedContext,
+            forwardedRequestBuffer,
+            requestChannelMetrics
+        );
+        handler.accept(forwardedRequest);
+    }
+
+    private static InetAddress parseForwardedClientAddress(byte[] address) {
+        try {
+            return InetAddress.getByAddress(address);
+        } catch (UnknownHostException e) {
+            throw new InvalidRequestException("Failed to parse client address 
from envelope", e);
+        }
+    }
+
+    private static Request parseForwardedRequest(
+        Request envelope,
+        RequestContext forwardedContext,
+        ByteBuffer buffer,
+        RequestChannelMetrics requestChannelMetrics
+    ) {
+        try {
+            Request forwardedRequest = new Request(
+                envelope.processor(),
+                forwardedContext,
+                envelope.startTimeNanos(),
+                envelope.memoryPool(),
+                buffer,
+                requestChannelMetrics,
+                Optional.of(envelope)
+            );
+            // set the dequeue time of forwardedRequest as the value of 
envelope request
+            
forwardedRequest.requestDequeueTimeNanos(envelope.requestDequeueTimeNanos());
+            return forwardedRequest;
+        } catch (InvalidRequestException e) {
+            // We use UNSUPPORTED_VERSION if the embedded request cannot be 
parsed.
+            // The purpose is to disambiguate structural errors in the 
envelope request
+            // itself, such as an invalid client address.
+            throw new UnsupportedVersionException("Failed to parse forwarded 
request with header " + forwardedContext.header, e);
+        }
+    }
+
+    private static RequestHeader parseForwardedRequestHeader(ByteBuffer 
buffer) {
+        try {
+            return RequestHeader.parse(buffer);
+        } catch (InvalidRequestException e) {
+            // We use UNSUPPORTED_VERSION if the embedded request cannot be 
parsed.
+            // The purpose is to disambiguate structural errors in the 
envelope request
+            // itself, such as an invalid client address.
+            throw new UnsupportedVersionException("Failed to parse request 
header from envelope", e);
+        }
+    }
+
+    private static KafkaPrincipal parseForwardedPrincipal(
+        RequestContext envelopeContext,
+        byte[] principalBytes
+    ) {
+        KafkaPrincipalSerde principalSerde = 
envelopeContext.principalSerde.orElseThrow(() ->
+            new PrincipalDeserializationException("Could not deserialize 
principal since no `KafkaPrincipalSerde` has been defined"));
+
+        try {
+            return principalSerde.deserialize(principalBytes);
+        } catch (Exception e) {
+            throw new PrincipalDeserializationException("Failed to deserialize 
client principal from envelope", e);
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/EnvelopeUtilsTest.java 
b/server/src/test/java/org/apache/kafka/server/EnvelopeUtilsTest.java
new file mode 100644
index 00000000000..e365fab9575
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/EnvelopeUtilsTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.PrincipalDeserializationException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.internals.SecurityUtils;
+import org.apache.kafka.network.Request;
+import org.apache.kafka.network.metrics.RequestChannelMetrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class EnvelopeUtilsTest {
+    private static final String CLIENT_ID = "client-id";
+    private static final String CONNECTION_ID = "connection-id";
+    private static final KafkaPrincipal FORWARDED_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "forwarded");
+    private static final KafkaPrincipalSerde PRINCIPAL_SERDE = new 
KafkaPrincipalSerde() {
+        @Override
+        public byte[] serialize(KafkaPrincipal principal) {
+            return Utils.utf8(principal.toString());
+        }
+
+        @Override
+        public KafkaPrincipal deserialize(byte[] bytes) {
+            return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+        }
+    };
+
+    @Test
+    public void testHandleEnvelopeRequestBuildsForwardedRequest() {
+        Request envelope = buildEnvelopeRequest(createForwardableRequest());
+        envelope.requestDequeueTimeNanos(123L);
+
+        AtomicReference<Request> handledRequest = new AtomicReference<>();
+        EnvelopeUtils.handleEnvelopeRequest(envelope, 
mock(RequestChannelMetrics.class), handledRequest::set);
+
+        Request forwardedRequest = handledRequest.get();
+        assertEquals(ApiKeys.CREATE_TOPICS, 
forwardedRequest.header().apiKey());
+        assertEquals(CLIENT_ID, forwardedRequest.header().clientId());
+        assertEquals(CONNECTION_ID, forwardedRequest.context().connectionId);
+        assertEquals(FORWARDED_PRINCIPAL, 
forwardedRequest.context().principal);
+        assertEquals(InetAddress.getLoopbackAddress(), 
forwardedRequest.context().clientAddress);
+        assertTrue(forwardedRequest.isForwarded());
+        assertSame(envelope, forwardedRequest.envelope().orElseThrow());
+        assertEquals(envelope.requestDequeueTimeNanos(), 
forwardedRequest.requestDequeueTimeNanos());
+        assertInstanceOf(CreateTopicsRequest.class, 
forwardedRequest.body(AbstractRequest.class));
+    }
+
+    @Test
+    public void testNonForwardableApiIsRejected() {
+        Request envelope = buildEnvelopeRequest(createNonForwardableRequest());
+
+        InvalidRequestException exception = assertThrows(
+            InvalidRequestException.class,
+            () -> EnvelopeUtils.handleEnvelopeRequest(envelope, 
mock(RequestChannelMetrics.class), ignored -> { })
+        );
+        assertEquals("API METADATA is not enabled or is not eligible for 
forwarding", exception.getMessage());
+    }
+
+    @Test
+    public void testInvalidForwardedClientAddressIsRejected() {
+        Request envelope = buildEnvelopeRequest(
+            createForwardableRequest(),
+            Optional.of(PRINCIPAL_SERDE),
+            new byte[] {1, 2, 3}
+        );
+
+        InvalidRequestException exception = assertThrows(
+            InvalidRequestException.class,
+            () -> EnvelopeUtils.handleEnvelopeRequest(envelope, 
mock(RequestChannelMetrics.class), ignored -> { })
+        );
+        assertEquals("Failed to parse client address from envelope", 
exception.getMessage());
+    }
+
+    @Test
+    public void testMissingPrincipalSerdeIsRejected() {
+        Request envelope = buildEnvelopeRequest(
+            createForwardableRequest(),
+            Optional.empty(),
+            InetAddress.getLoopbackAddress().getAddress()
+        );
+
+        PrincipalDeserializationException exception = assertThrows(
+            PrincipalDeserializationException.class,
+            () -> EnvelopeUtils.handleEnvelopeRequest(envelope, 
mock(RequestChannelMetrics.class), ignored -> { })
+        );
+        assertEquals(
+            "Could not deserialize principal since no `KafkaPrincipalSerde` 
has been defined",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void testPrincipalDeserializationFailureIsRejected() {
+        KafkaPrincipalSerde failingSerde = new KafkaPrincipalSerde() {
+            @Override
+            public byte[] serialize(KafkaPrincipal principal) {
+                return Utils.utf8(principal.toString());
+            }
+
+            @Override
+            public KafkaPrincipal deserialize(byte[] bytes) {
+                throw new IllegalArgumentException("mock error");
+            }
+        };
+        Request envelope = buildEnvelopeRequest(
+            createForwardableRequest(),
+            Optional.of(failingSerde),
+            InetAddress.getLoopbackAddress().getAddress()
+        );
+
+        PrincipalDeserializationException exception = assertThrows(
+            PrincipalDeserializationException.class,
+            () -> EnvelopeUtils.handleEnvelopeRequest(envelope, 
mock(RequestChannelMetrics.class), ignored -> { })
+        );
+        assertEquals("Failed to deserialize client principal from envelope", 
exception.getMessage());
+    }
+
+    private static CreateTopicsRequest createForwardableRequest() {
+        CreateTopicsRequestData requestData = new CreateTopicsRequestData();
+        requestData.topics().add(new CreatableTopic()
+            .setName("topic")
+            .setReplicationFactor((short) -1)
+            .setNumPartitions(-1)
+        );
+        return new CreateTopicsRequest.Builder(requestData).build();
+    }
+
+    private static MetadataRequest createNonForwardableRequest() {
+        return new MetadataRequest.Builder(List.of("topic"), true).build();
+    }
+
+    private static Request buildEnvelopeRequest(AbstractRequest 
forwardedRequest) {
+        return buildEnvelopeRequest(
+            forwardedRequest,
+            Optional.of(PRINCIPAL_SERDE),
+            InetAddress.getLoopbackAddress().getAddress()
+        );
+    }
+
+    private static Request buildEnvelopeRequest(
+        AbstractRequest forwardedRequest,
+        Optional<KafkaPrincipalSerde> contextPrincipalSerde,
+        byte[] clientAddress
+    ) {
+        RequestHeader forwardedRequestHeader = new RequestHeader(
+            forwardedRequest.apiKey(),
+            forwardedRequest.version(),
+            CLIENT_ID,
+            1
+        );
+        ByteBuffer forwardedRequestBuffer = 
forwardedRequest.serializeWithHeader(forwardedRequestHeader);
+
+        RequestHeader envelopeRequestHeader = new RequestHeader(
+            ApiKeys.ENVELOPE,
+            ApiKeys.ENVELOPE.latestVersion(),
+            CLIENT_ID,
+            2
+        );
+        ByteBuffer envelopeRequestBuffer = new EnvelopeRequest.Builder(
+            forwardedRequestBuffer,
+            PRINCIPAL_SERDE.serialize(FORWARDED_PRINCIPAL),
+            clientAddress
+        ).build().serializeWithHeader(envelopeRequestHeader);
+
+        // Advance the buffer past the envelope header before constructing the 
Request.
+        RequestHeader.parse(envelopeRequestBuffer);
+
+        RequestContext envelopeContext = new RequestContext(
+            envelopeRequestHeader,
+            CONNECTION_ID,
+            InetAddress.getLoopbackAddress(),
+            Optional.empty(),
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false,
+            contextPrincipalSerde
+        );
+
+        return new Request(
+            1,
+            envelopeContext,
+            0,
+            MemoryPool.NONE,
+            envelopeRequestBuffer,
+            mock(RequestChannelMetrics.class),
+            Optional.empty()
+        );
+    }
+}

Reply via email to