This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3dd7838b607 KAFKA-20639: Move EnvelopeUtils to server module (#22416)
3dd7838b607 is described below
commit 3dd7838b607255cc9a18484a75aaf33c6c59dbf3
Author: majialong <[email protected]>
AuthorDate: Mon Jun 1 17:37:25 2026 +0800
KAFKA-20639: Move EnvelopeUtils to server module (#22416)
Move `EnvelopeUtils` to the server module in Java and add unit tests.
Reviewers: Mickael Maison <[email protected]>, Ken Huang
<[email protected]>
---
.../main/scala/kafka/server/ControllerApis.scala | 2 +-
.../main/scala/kafka/server/EnvelopeUtils.scala | 143 -------------
.../unit/kafka/network/RequestChannelTest.scala | 2 +-
.../org/apache/kafka/server/EnvelopeUtils.java | 138 ++++++++++++
.../org/apache/kafka/server/EnvelopeUtilsTest.java | 235 +++++++++++++++++++++
5 files changed, 375 insertions(+), 145 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index ef82b0cfb84..a36672311fd 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -55,7 +55,7 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply,
import org.apache.kafka.network.Request
import org.apache.kafka.raft.RaftManager
import org.apache.kafka.security.DelegationTokenManager
-import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
+import org.apache.kafka.server.{ApiVersionManager, EnvelopeUtils, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
import org.apache.kafka.server.quota.ControllerMutationQuota
diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
deleted file mode 100644
index 0d364c7fcf6..00000000000
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.net.{InetAddress, UnknownHostException}
-import java.nio.ByteBuffer
-import org.apache.kafka.common.errors.{InvalidRequestException,
PrincipalDeserializationException, UnsupportedVersionException}
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext,
RequestHeader}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.network.Request
-import org.apache.kafka.network.metrics.RequestChannelMetrics
-
-import java.util.Optional
-import scala.jdk.OptionConverters.RichOptional
-
-
-object EnvelopeUtils {
- def handleEnvelopeRequest(
- request: Request,
- requestChannelMetrics: RequestChannelMetrics,
- handler: Request => Unit
- ): Unit = {
- val envelope = request.body(classOf[EnvelopeRequest])
- val forwardedPrincipal = parseForwardedPrincipal(request.context,
envelope.requestPrincipal)
- val forwardedClientAddress =
parseForwardedClientAddress(envelope.clientAddress)
-
- val forwardedRequestBuffer = envelope.requestData.duplicate()
- val forwardedRequestHeader =
parseForwardedRequestHeader(forwardedRequestBuffer)
-
- val forwardedApi = forwardedRequestHeader.apiKey
- if (!forwardedApi.forwardable) {
- throw new InvalidRequestException(s"API $forwardedApi is not enabled or
is not eligible for forwarding")
- }
-
- val forwardedContext = new RequestContext(
- forwardedRequestHeader,
- request.context.connectionId,
- forwardedClientAddress,
- forwardedPrincipal,
- request.context.listenerName,
- request.context.securityProtocol,
- ClientInformation.EMPTY,
- request.context.fromPrivilegedListener
- )
-
- val forwardedRequest = parseForwardedRequest(
- request,
- forwardedContext,
- forwardedRequestBuffer,
- requestChannelMetrics
- )
- handler(forwardedRequest)
- }
-
- private def parseForwardedClientAddress(
- address: Array[Byte]
- ): InetAddress = {
- try {
- InetAddress.getByAddress(address)
- } catch {
- case e: UnknownHostException =>
- throw new InvalidRequestException("Failed to parse client address from
envelope", e)
- }
- }
-
- private def parseForwardedRequest(
- envelope: Request,
- forwardedContext: RequestContext,
- buffer: ByteBuffer,
- requestChannelMetrics: RequestChannelMetrics
- ): Request = {
- try {
- val forwardedRequest = new Request(
- envelope.processor,
- forwardedContext,
- envelope.startTimeNanos,
- envelope.memoryPool,
- buffer,
- requestChannelMetrics,
- Optional.of(envelope)
- )
- // set the dequeue time of forwardedRequest as the value of envelope
request
-
forwardedRequest.requestDequeueTimeNanos(envelope.requestDequeueTimeNanos)
- forwardedRequest
- } catch {
- case e: InvalidRequestException =>
- // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
- // The purpose is to disambiguate structural errors in the envelope
request
- // itself, such as an invalid client address.
- throw new UnsupportedVersionException(s"Failed to parse forwarded
request " +
- s"with header ${forwardedContext.header}", e)
- }
- }
-
- private def parseForwardedRequestHeader(
- buffer: ByteBuffer
- ): RequestHeader = {
- try {
- RequestHeader.parse(buffer)
- } catch {
- case e: InvalidRequestException =>
- // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
- // The purpose is to disambiguate structural errors in the envelope
request
- // itself, such as an invalid client address.
- throw new UnsupportedVersionException("Failed to parse request header
from envelope", e)
- }
- }
-
- private def parseForwardedPrincipal(
- envelopeContext: RequestContext,
- principalBytes: Array[Byte]
- ): KafkaPrincipal = {
- envelopeContext.principalSerde.toScala match {
- case Some(serde) =>
- try {
- serde.deserialize(principalBytes)
- } catch {
- case e: Exception =>
- throw new PrincipalDeserializationException("Failed to deserialize
client principal from envelope", e)
- }
-
- case None =>
- throw new PrincipalDeserializationException("Could not deserialize
principal since " +
- "no `KafkaPrincipalSerde` has been defined")
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 1c0cbe74fdb..f896ae2a7df 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -18,7 +18,6 @@
package kafka.network
import com.fasterxml.jackson.databind.ObjectMapper
-import kafka.server.EnvelopeUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs,
SslConfigs, TopicConfig}
@@ -35,6 +34,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.internals.SecurityUtils
import org.apache.kafka.network.{Request, RequestConvertToJson}
import org.apache.kafka.network.metrics.RequestChannelMetrics
+import org.apache.kafka.server.EnvelopeUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
diff --git a/server/src/main/java/org/apache/kafka/server/EnvelopeUtils.java
b/server/src/main/java/org/apache/kafka/server/EnvelopeUtils.java
new file mode 100644
index 00000000000..21a4a00aed3
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/EnvelopeUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.PrincipalDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.network.Request;
+import org.apache.kafka.network.metrics.RequestChannelMetrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public final class EnvelopeUtils {
+ private EnvelopeUtils() {
+ }
+
+ public static void handleEnvelopeRequest(
+ Request request,
+ RequestChannelMetrics requestChannelMetrics,
+ Consumer<Request> handler
+ ) {
+ EnvelopeRequest envelope = request.body(EnvelopeRequest.class);
+ KafkaPrincipal forwardedPrincipal =
parseForwardedPrincipal(request.context(), envelope.requestPrincipal());
+ InetAddress forwardedClientAddress =
parseForwardedClientAddress(envelope.clientAddress());
+
+ ByteBuffer forwardedRequestBuffer = envelope.requestData().duplicate();
+ RequestHeader forwardedRequestHeader =
parseForwardedRequestHeader(forwardedRequestBuffer);
+
+ ApiKeys forwardedApi = forwardedRequestHeader.apiKey();
+ if (!forwardedApi.forwardable) {
+ throw new InvalidRequestException("API " + forwardedApi + " is not
enabled or is not eligible for forwarding");
+ }
+
+ RequestContext forwardedContext = new RequestContext(
+ forwardedRequestHeader,
+ request.context().connectionId,
+ forwardedClientAddress,
+ forwardedPrincipal,
+ request.context().listenerName,
+ request.context().securityProtocol,
+ ClientInformation.EMPTY,
+ request.context().fromPrivilegedListener
+ );
+
+ Request forwardedRequest = parseForwardedRequest(
+ request,
+ forwardedContext,
+ forwardedRequestBuffer,
+ requestChannelMetrics
+ );
+ handler.accept(forwardedRequest);
+ }
+
+ private static InetAddress parseForwardedClientAddress(byte[] address) {
+ try {
+ return InetAddress.getByAddress(address);
+ } catch (UnknownHostException e) {
+ throw new InvalidRequestException("Failed to parse client address
from envelope", e);
+ }
+ }
+
+ private static Request parseForwardedRequest(
+ Request envelope,
+ RequestContext forwardedContext,
+ ByteBuffer buffer,
+ RequestChannelMetrics requestChannelMetrics
+ ) {
+ try {
+ Request forwardedRequest = new Request(
+ envelope.processor(),
+ forwardedContext,
+ envelope.startTimeNanos(),
+ envelope.memoryPool(),
+ buffer,
+ requestChannelMetrics,
+ Optional.of(envelope)
+ );
+ // set the dequeue time of forwardedRequest as the value of
envelope request
+
forwardedRequest.requestDequeueTimeNanos(envelope.requestDequeueTimeNanos());
+ return forwardedRequest;
+ } catch (InvalidRequestException e) {
+ // We use UNSUPPORTED_VERSION if the embedded request cannot be
parsed.
+ // The purpose is to disambiguate structural errors in the
envelope request
+ // itself, such as an invalid client address.
+ throw new UnsupportedVersionException("Failed to parse forwarded
request with header " + forwardedContext.header, e);
+ }
+ }
+
+ private static RequestHeader parseForwardedRequestHeader(ByteBuffer
buffer) {
+ try {
+ return RequestHeader.parse(buffer);
+ } catch (InvalidRequestException e) {
+ // We use UNSUPPORTED_VERSION if the embedded request cannot be
parsed.
+ // The purpose is to disambiguate structural errors in the
envelope request
+ // itself, such as an invalid client address.
+ throw new UnsupportedVersionException("Failed to parse request
header from envelope", e);
+ }
+ }
+
+ private static KafkaPrincipal parseForwardedPrincipal(
+ RequestContext envelopeContext,
+ byte[] principalBytes
+ ) {
+ KafkaPrincipalSerde principalSerde =
envelopeContext.principalSerde.orElseThrow(() ->
+ new PrincipalDeserializationException("Could not deserialize
principal since no `KafkaPrincipalSerde` has been defined"));
+
+ try {
+ return principalSerde.deserialize(principalBytes);
+ } catch (Exception e) {
+ throw new PrincipalDeserializationException("Failed to deserialize
client principal from envelope", e);
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/EnvelopeUtilsTest.java
b/server/src/test/java/org/apache/kafka/server/EnvelopeUtilsTest.java
new file mode 100644
index 00000000000..e365fab9575
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/EnvelopeUtilsTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.PrincipalDeserializationException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.internals.SecurityUtils;
+import org.apache.kafka.network.Request;
+import org.apache.kafka.network.metrics.RequestChannelMetrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class EnvelopeUtilsTest {
+ private static final String CLIENT_ID = "client-id";
+ private static final String CONNECTION_ID = "connection-id";
+ private static final KafkaPrincipal FORWARDED_PRINCIPAL = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "forwarded");
+ private static final KafkaPrincipalSerde PRINCIPAL_SERDE = new
KafkaPrincipalSerde() {
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) {
+ return Utils.utf8(principal.toString());
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) {
+ return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+ }
+ };
+
+ @Test
+ public void testHandleEnvelopeRequestBuildsForwardedRequest() {
+ Request envelope = buildEnvelopeRequest(createForwardableRequest());
+ envelope.requestDequeueTimeNanos(123L);
+
+ AtomicReference<Request> handledRequest = new AtomicReference<>();
+ EnvelopeUtils.handleEnvelopeRequest(envelope,
mock(RequestChannelMetrics.class), handledRequest::set);
+
+ Request forwardedRequest = handledRequest.get();
+ assertEquals(ApiKeys.CREATE_TOPICS,
forwardedRequest.header().apiKey());
+ assertEquals(CLIENT_ID, forwardedRequest.header().clientId());
+ assertEquals(CONNECTION_ID, forwardedRequest.context().connectionId);
+ assertEquals(FORWARDED_PRINCIPAL,
forwardedRequest.context().principal);
+ assertEquals(InetAddress.getLoopbackAddress(),
forwardedRequest.context().clientAddress);
+ assertTrue(forwardedRequest.isForwarded());
+ assertSame(envelope, forwardedRequest.envelope().orElseThrow());
+ assertEquals(envelope.requestDequeueTimeNanos(),
forwardedRequest.requestDequeueTimeNanos());
+ assertInstanceOf(CreateTopicsRequest.class,
forwardedRequest.body(AbstractRequest.class));
+ }
+
+ @Test
+ public void testNonForwardableApiIsRejected() {
+ Request envelope = buildEnvelopeRequest(createNonForwardableRequest());
+
+ InvalidRequestException exception = assertThrows(
+ InvalidRequestException.class,
+ () -> EnvelopeUtils.handleEnvelopeRequest(envelope,
mock(RequestChannelMetrics.class), ignored -> { })
+ );
+ assertEquals("API METADATA is not enabled or is not eligible for
forwarding", exception.getMessage());
+ }
+
+ @Test
+ public void testInvalidForwardedClientAddressIsRejected() {
+ Request envelope = buildEnvelopeRequest(
+ createForwardableRequest(),
+ Optional.of(PRINCIPAL_SERDE),
+ new byte[] {1, 2, 3}
+ );
+
+ InvalidRequestException exception = assertThrows(
+ InvalidRequestException.class,
+ () -> EnvelopeUtils.handleEnvelopeRequest(envelope,
mock(RequestChannelMetrics.class), ignored -> { })
+ );
+ assertEquals("Failed to parse client address from envelope",
exception.getMessage());
+ }
+
+ @Test
+ public void testMissingPrincipalSerdeIsRejected() {
+ Request envelope = buildEnvelopeRequest(
+ createForwardableRequest(),
+ Optional.empty(),
+ InetAddress.getLoopbackAddress().getAddress()
+ );
+
+ PrincipalDeserializationException exception = assertThrows(
+ PrincipalDeserializationException.class,
+ () -> EnvelopeUtils.handleEnvelopeRequest(envelope,
mock(RequestChannelMetrics.class), ignored -> { })
+ );
+ assertEquals(
+ "Could not deserialize principal since no `KafkaPrincipalSerde`
has been defined",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void testPrincipalDeserializationFailureIsRejected() {
+ KafkaPrincipalSerde failingSerde = new KafkaPrincipalSerde() {
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) {
+ return Utils.utf8(principal.toString());
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) {
+ throw new IllegalArgumentException("mock error");
+ }
+ };
+ Request envelope = buildEnvelopeRequest(
+ createForwardableRequest(),
+ Optional.of(failingSerde),
+ InetAddress.getLoopbackAddress().getAddress()
+ );
+
+ PrincipalDeserializationException exception = assertThrows(
+ PrincipalDeserializationException.class,
+ () -> EnvelopeUtils.handleEnvelopeRequest(envelope,
mock(RequestChannelMetrics.class), ignored -> { })
+ );
+ assertEquals("Failed to deserialize client principal from envelope",
exception.getMessage());
+ }
+
+ private static CreateTopicsRequest createForwardableRequest() {
+ CreateTopicsRequestData requestData = new CreateTopicsRequestData();
+ requestData.topics().add(new CreatableTopic()
+ .setName("topic")
+ .setReplicationFactor((short) -1)
+ .setNumPartitions(-1)
+ );
+ return new CreateTopicsRequest.Builder(requestData).build();
+ }
+
+ private static MetadataRequest createNonForwardableRequest() {
+ return new MetadataRequest.Builder(List.of("topic"), true).build();
+ }
+
+ private static Request buildEnvelopeRequest(AbstractRequest
forwardedRequest) {
+ return buildEnvelopeRequest(
+ forwardedRequest,
+ Optional.of(PRINCIPAL_SERDE),
+ InetAddress.getLoopbackAddress().getAddress()
+ );
+ }
+
+ private static Request buildEnvelopeRequest(
+ AbstractRequest forwardedRequest,
+ Optional<KafkaPrincipalSerde> contextPrincipalSerde,
+ byte[] clientAddress
+ ) {
+ RequestHeader forwardedRequestHeader = new RequestHeader(
+ forwardedRequest.apiKey(),
+ forwardedRequest.version(),
+ CLIENT_ID,
+ 1
+ );
+ ByteBuffer forwardedRequestBuffer =
forwardedRequest.serializeWithHeader(forwardedRequestHeader);
+
+ RequestHeader envelopeRequestHeader = new RequestHeader(
+ ApiKeys.ENVELOPE,
+ ApiKeys.ENVELOPE.latestVersion(),
+ CLIENT_ID,
+ 2
+ );
+ ByteBuffer envelopeRequestBuffer = new EnvelopeRequest.Builder(
+ forwardedRequestBuffer,
+ PRINCIPAL_SERDE.serialize(FORWARDED_PRINCIPAL),
+ clientAddress
+ ).build().serializeWithHeader(envelopeRequestHeader);
+
+ // Advance the buffer past the envelope header before constructing the
Request.
+ RequestHeader.parse(envelopeRequestBuffer);
+
+ RequestContext envelopeContext = new RequestContext(
+ envelopeRequestHeader,
+ CONNECTION_ID,
+ InetAddress.getLoopbackAddress(),
+ Optional.empty(),
+ KafkaPrincipal.ANONYMOUS,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT,
+ ClientInformation.EMPTY,
+ false,
+ contextPrincipalSerde
+ );
+
+ return new Request(
+ 1,
+ envelopeContext,
+ 0,
+ MemoryPool.NONE,
+ envelopeRequestBuffer,
+ mock(RequestChannelMetrics.class),
+ Optional.empty()
+ );
+ }
+}