This is an automated email from the ASF dual-hosted git repository.

FrankYang0529 pushed a commit to branch KAFKA-10730
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4274566a3cd1bd9f5351b09bba52f1604924068c
Author: PoAn Yang <[email protected]>
AuthorDate: Fri May 9 13:38:03 2025 +0800

    KAFKA-10730: KafkaApis#handleProduceRequest should use auto-generated 
protocol
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../kafka/common/requests/ProduceResponse.java     | 65 ++-------------
 .../kafka/clients/producer/KafkaProducerTest.java  | 15 +++-
 .../clients/producer/internals/SenderTest.java     | 81 ++++++++++++++-----
 .../producer/internals/TransactionManagerTest.java | 15 +++-
 .../kafka/common/requests/ProduceResponseTest.java | 43 ++++++----
 .../kafka/common/requests/RequestResponseTest.java | 38 ++++++---
 .../transaction/TransactionStateManager.scala      | 16 ++--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 70 ++++++++++------
 .../main/scala/kafka/server/ReplicaManager.scala   | 49 ++++++-----
 .../kafka/server/LocalLeaderEndPointTest.scala     | 38 ++++-----
 .../AbstractCoordinatorConcurrencyTest.scala       | 18 ++++-
 .../transaction/TransactionStateManagerTest.scala  | 20 +++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 69 ++++++++++++----
 .../server/ReplicaManagerConcurrencyTest.scala     |  9 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 94 +++++++++++-----------
 .../jmh/producer/ProducerResponseBenchmark.java    | 23 +++---
 .../kafka/server/purgatory/DelayedProduce.java     | 24 +++---
 17 files changed, 405 insertions(+), 282 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 9d7817e4902..d1e688c2b8d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.ProduceResponseData;
-import org.apache.kafka.common.message.ProduceResponseData.LeaderIdAndEpoch;
+import org.apache.kafka.common.message.ProduceResponseData.NodeEndpoint;
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
@@ -29,7 +29,6 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * This wrapper supports both v0 and v8 of ProduceResponse.
@@ -65,42 +64,11 @@ public class ProduceResponse extends AbstractResponse {
         this.data = produceResponseData;
     }
 
-    /**
-     * Constructor for Version 0
-     * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
-     * in KAFKA-10730
-     * @param responses Produced data grouped by topic-partition
-     */
-    @Deprecated
-    public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses) 
{
-        this(responses, DEFAULT_THROTTLE_TIME, List.of());
+    public ProduceResponse(Map<TopicIdPartition, PartitionProduceResponse> 
responses, List<NodeEndpoint> nodeEndpoints, int throttleTimeMs) {
+        this(toData(responses, nodeEndpoints, throttleTimeMs));
     }
 
-    /**
-     * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
-     * in KAFKA-10730
-     * @param responses Produced data grouped by topic-partition
-     * @param throttleTimeMs Time in milliseconds the response was throttled
-     */
-    @Deprecated
-    public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this(toData(responses, throttleTimeMs, List.of()));
-    }
-
-    /**
-     * Constructor for the latest version
-     * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
-     * in KAFKA-10730
-     * @param responses Produced data grouped by topic-partition
-     * @param throttleTimeMs Time in milliseconds the response was throttled
-     * @param nodeEndpoints List of node endpoints
-     */
-    @Deprecated
-    public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, 
int throttleTimeMs, List<Node> nodeEndpoints) {
-        this(toData(responses, throttleTimeMs, nodeEndpoints));
-    }
-
-    private static ProduceResponseData toData(Map<TopicIdPartition, 
PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
+    private static ProduceResponseData toData(Map<TopicIdPartition, 
PartitionProduceResponse> responses, List<NodeEndpoint> nodeEndpoints, int 
throttleTimeMs) {
         ProduceResponseData data = new 
ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
         responses.forEach((tp, response) -> {
             ProduceResponseData.TopicProduceResponse tpr = 
data.responses().find(tp.topic(), tp.topicId());
@@ -108,28 +76,9 @@ public class ProduceResponse extends AbstractResponse {
                 tpr = new 
ProduceResponseData.TopicProduceResponse().setName(tp.topic()).setTopicId(tp.topicId());
                 data.responses().add(tpr);
             }
-            tpr.partitionResponses()
-                .add(new ProduceResponseData.PartitionProduceResponse()
-                    .setIndex(tp.partition())
-                    .setBaseOffset(response.baseOffset)
-                    .setLogStartOffset(response.logStartOffset)
-                    .setLogAppendTimeMs(response.logAppendTime)
-                    .setErrorMessage(response.errorMessage)
-                    .setErrorCode(response.error.code())
-                    .setCurrentLeader(response.currentLeader != null ? 
response.currentLeader : new LeaderIdAndEpoch())
-                    .setRecordErrors(response.recordErrors
-                        .stream()
-                        .map(e -> new 
ProduceResponseData.BatchIndexAndErrorMessage()
-                            .setBatchIndex(e.batchIndex)
-                            .setBatchIndexErrorMessage(e.message))
-                        .collect(Collectors.toList())));
+            tpr.partitionResponses().add(response);
         });
-        nodeEndpoints.forEach(endpoint -> data.nodeEndpoints()
-                .add(new ProduceResponseData.NodeEndpoint()
-                        .setNodeId(endpoint.id())
-                        .setHost(endpoint.host())
-                        .setPort(endpoint.port())
-                        .setRack(endpoint.rack())));
+        nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(endpoint));
         return data;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 98ffd66270c..61c15453f3b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -59,6 +59,7 @@ import 
org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -3005,11 +3006,17 @@ public class KafkaProducerTest {
         assertDoesNotThrow(() -> new KafkaProducer<>(configs, new 
StringSerializer(), new StringSerializer()).close());
     }
 
-    @SuppressWarnings("deprecation")
     private ProduceResponse produceResponse(TopicIdPartition topicIdPartition, 
long offset, Errors error, int throttleTimeMs, int logStartOffset) {
-        ProduceResponse.PartitionResponse resp = new 
ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, 
logStartOffset);
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = 
singletonMap(topicIdPartition, resp);
-        return new ProduceResponse(partResp, throttleTimeMs);
+        return new ProduceResponse(Map.of(
+            topicIdPartition,
+            new PartitionProduceResponse()
+                .setIndex(topicIdPartition.partition())
+                .setBaseOffset(offset)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(logStartOffset)
+                .setErrorCode(error.code())),
+            List.of(),
+            throttleTimeMs);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 52dd2e08e58..59ce8da1c6a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -70,6 +70,7 @@ import 
org.apache.kafka.common.record.internal.MutableRecordBatch;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
@@ -2448,9 +2449,17 @@ public class SenderTest {
             assertTrue(inflightBatch.isInflight(), "Batch should be marked 
inflight after being sent");
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
-            Map<TopicIdPartition, ProduceResponse.PartitionResponse> 
responseMap = new HashMap<>();
-            responseMap.put(tpId, new 
ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
-            client.respond(new ProduceResponse(responseMap));
+            Map<TopicIdPartition, 
ProduceResponseData.PartitionProduceResponse> responseMap = new HashMap<>();
+            responseMap.put(
+                tpId,
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(tpId.partition())
+                    .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+                    .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+                    .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                    .setErrorCode(Errors.MESSAGE_TOO_LARGE.code())
+            );
+            client.respond(new ProduceResponse(responseMap, List.of(), 
AbstractResponse.DEFAULT_THROTTLE_TIME));
             sender.runOnce(); // split and reenqueue
             assertFalse(inflightBatch.isInflight(), "Batch should be marked as 
not inflight after being split and re-enqueued");
             assertEquals(2, txnManager.sequenceNumber(tpId.topicPartition()), 
"The next sequence should be 2");
@@ -2467,9 +2476,17 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
-            responseMap.put(tpId, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+            responseMap.put(
+                tpId,
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(tpId.partition())
+                    .setBaseOffset(0)
+                    .setLogStartOffset(0)
+                    .setLogAppendTimeMs(0)
+                    .setErrorCode(Errors.NONE.code())
+            );
             client.respond(produceRequestMatcher(tpId.topicPartition(), 
producerIdAndEpoch, 0, txnManager.isTransactional()),
-                    new ProduceResponse(responseMap));
+                    new ProduceResponse(responseMap, List.of(), 
AbstractResponse.DEFAULT_THROTTLE_TIME));
 
             sender.runOnce(); // receive
             assertTrue(f1.isDone(), "The future should have been done.");
@@ -2484,9 +2501,17 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
-            responseMap.put(tpId, new 
ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
+            responseMap.put(
+                tpId,
+                new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(tpId.partition())
+                    .setBaseOffset(1)
+                    .setLogStartOffset(0)
+                    .setLogAppendTimeMs(0)
+                    .setErrorCode(Errors.NONE.code())
+            );
             client.respond(produceRequestMatcher(tpId.topicPartition(), 
producerIdAndEpoch, 1, txnManager.isTransactional()),
-                    new ProduceResponse(responseMap));
+                    new ProduceResponse(responseMap, List.of(), 
AbstractResponse.DEFAULT_THROTTLE_TIME));
 
             sender.runOnce(); // receive
             assertTrue(f2.isDone(), "The future should have been done.");
@@ -2539,9 +2564,16 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size(), "Expect one 
in-flight batch in accumulator");
 
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
-        responseMap.put(new TopicIdPartition(TOPIC_ID, tp0), new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
-        client.respond(new ProduceResponse(responseMap));
+        Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> 
responseMap = Map.of(
+            new TopicIdPartition(TOPIC_ID, tp0),
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(tp0.partition())
+                .setBaseOffset(0)
+                .setLogStartOffset(0)
+                .setLogAppendTimeMs(0)
+                .setErrorCode(Errors.NONE.code())
+        );
+        client.respond(new ProduceResponse(responseMap, List.of(), 
AbstractResponse.DEFAULT_THROTTLE_TIME));
 
         time.sleep(deliveryTimeoutMs);
         sender.runOnce();  // receive first response
@@ -2719,9 +2751,16 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size(), "Expect one 
in-flight batch in accumulator");
 
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
-        responseMap.put(new TopicIdPartition(TOPIC_ID, tp0), new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
-        client.respond(new ProduceResponse(responseMap));
+        Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> 
responseMap = Map.of(
+            new TopicIdPartition(TOPIC_ID, tp0),
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(tp0.partition())
+                .setBaseOffset(0)
+                .setLogStartOffset(0)
+                .setLogAppendTimeMs(0)
+                .setErrorCode(Errors.NONE.code())
+        );
+        client.respond(new ProduceResponse(responseMap, List.of(), 
AbstractResponse.DEFAULT_THROTTLE_TIME));
 
         // Successfully expire both batches.
         time.sleep(deliveryTimeoutMs);
@@ -3756,12 +3795,18 @@ public class SenderTest {
                 null, MAX_BLOCK_TIMEOUT, time.milliseconds(), 
TestUtils.singletonCluster()).future;
     }
 
-    @SuppressWarnings("deprecation")
     private ProduceResponse produceResponse(TopicPartition tp, long offset, 
Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
-        ProduceResponse.PartitionResponse resp = new 
ProduceResponse.PartitionResponse(error, offset,
-                RecordBatch.NO_TIMESTAMP, logStartOffset, 
Collections.emptyList(), errorMessage);
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = 
Collections.singletonMap(new TopicIdPartition(TOPIC_ID, tp), resp);
-        return new ProduceResponse(partResp, throttleTimeMs);
+        return new ProduceResponse(Map.of(
+            new TopicIdPartition(TOPIC_ID, tp),
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(tp.partition())
+                .setBaseOffset(offset)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(logStartOffset)
+                .setErrorCode(error.code())
+                .setErrorMessage(errorMessage)),
+            List.of(),
+            throttleTimeMs);
     }
 
     private ProduceResponse produceResponse(Map<TopicPartition, 
OffsetAndError> responses) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index a13e44a5e16..ccd9b713700 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -51,6 +51,7 @@ import 
org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.message.ProduceResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -4484,11 +4485,17 @@ public class TransactionManagerTest {
         return produceResponse(tp, offset, error, throttleTimeMs, 10);
     }
 
-    @SuppressWarnings("deprecation")
     private ProduceResponse produceResponse(TopicPartition tp, long offset, 
Errors error, int throttleTimeMs, int logStartOffset) {
-        ProduceResponse.PartitionResponse resp = new 
ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, 
logStartOffset);
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = 
singletonMap(new TopicIdPartition(TOPIC_ID, tp), resp);
-        return new ProduceResponse(partResp, throttleTimeMs);
+        return new ProduceResponse(Map.of(
+            new TopicIdPartition(TOPIC_ID, tp),
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(tp.partition())
+                .setBaseOffset(offset)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(logStartOffset)
+                .setErrorCode(error.code())),
+            List.of(),
+            throttleTimeMs);
     }
 
     private void initializeIdempotentProducerId(long producerId, short epoch) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
index 7a9a8e5ca25..b9f0682991d 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
@@ -26,8 +26,6 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,16 +36,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ProduceResponseTest {
 
-    @SuppressWarnings("deprecation")
     @Test
     public void produceResponseVersionTest() {
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData 
= new HashMap<>();
         Uuid topicId = Uuid.fromString("5JkYABorYD4w0AQXe9TvBG");
         TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, 0, 
"test");
-        responseData.put(topicIdPartition, new 
ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 
100));
-        ProduceResponse v0Response = new ProduceResponse(responseData);
-        ProduceResponse v1Response = new ProduceResponse(responseData, 10);
-        ProduceResponse v2Response = new ProduceResponse(responseData, 10);
+        Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> 
responseData = Map.of(
+            topicIdPartition,
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(0)
+                .setBaseOffset(10000)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(100)
+                .setErrorCode(Errors.NONE.code())
+        );
+        ProduceResponse v0Response = new ProduceResponse(responseData, 
List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME);
+        ProduceResponse v1Response = new ProduceResponse(responseData, 
List.of(), 10);
+        ProduceResponse v2Response = new ProduceResponse(responseData, 
List.of(), 10);
         assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be 
zero");
         assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 
10");
         assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 
10");
@@ -71,17 +75,26 @@ public class ProduceResponseTest {
     @SuppressWarnings("deprecation")
     @Test
     public void produceResponseRecordErrorsTest() {
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData 
= new HashMap<>();
         Uuid topicId = Uuid.fromString("4w0AQXe9TvBG5JkYABorYD");
         TopicIdPartition tp = new TopicIdPartition(topicId, 0, "test");
-        ProduceResponse.PartitionResponse partResponse = new 
ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100,
-                Collections.singletonList(new ProduceResponse.RecordError(3, 
"Record error")),
-                "Produce failed");
-        responseData.put(tp, partResponse);
+        Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> 
responseData = Map.of(
+            tp,
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(tp.partition())
+                .setBaseOffset(10000)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(100)
+                .setErrorCode(Errors.NONE.code())
+                .setRecordErrors(List.of(
+                    new ProduceResponseData.BatchIndexAndErrorMessage()
+                        .setBatchIndex(3)
+                        .setBatchIndexErrorMessage("Record error")
+                ))
+                .setErrorMessage("Produce failed")
+        );
 
         for (short version : PRODUCE.allVersions()) {
-            ProduceResponse response = new ProduceResponse(responseData);
+            ProduceResponse response = new ProduceResponse(responseData, 
List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME);
 
             ProduceResponse produceResponse = 
ProduceResponse.parse(response.serialize(version), version);
             ProduceResponseData.TopicProduceResponse topicProduceResponse = 
produceResponse.data().responses().iterator().next();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a87a835fac0..e6c478910e4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2543,21 +2543,35 @@ public class RequestResponseTest {
 
     @SuppressWarnings("deprecation")
     private ProduceResponse createProduceResponse() {
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData 
= new HashMap<>();
-        Uuid topicId = Uuid.fromString("0AQorYD4we9TvBGX5JkYAB");
-        responseData.put(new TopicIdPartition(topicId, 0, "test"), new 
ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
-        return new ProduceResponse(responseData, 0);
+        return new ProduceResponse(Map.of(
+            new TopicIdPartition(Uuid.fromString("0AQorYD4we9TvBGX5JkYAB"), 0, 
"test"),
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(0)
+                .setBaseOffset(10000)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(100)
+                .setErrorCode(Errors.NONE.code())),
+            List.of(),
+            0);
     }
 
-    @SuppressWarnings("deprecation")
     private ProduceResponse createProduceResponseWithErrorMessage() {
-        Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData 
= new HashMap<>();
-        Uuid topicId = Uuid.fromString("0AQorYD4we9TvBGX5JkYAB");
-        responseData.put(new TopicIdPartition(topicId, 0, "test"), new 
ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100, singletonList(new 
ProduceResponse.RecordError(0, "error message")),
-                "global error message"));
-        return new ProduceResponse(responseData, 0);
+        return new ProduceResponse(Map.of(
+            new TopicIdPartition(Uuid.fromString("0AQorYD4we9TvBGX5JkYAB"), 0, 
"test"),
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(0)
+                .setBaseOffset(10000)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(100)
+                .setErrorCode(Errors.NONE.code())
+                .setRecordErrors(List.of(
+                    new ProduceResponseData.BatchIndexAndErrorMessage()
+                        .setBatchIndex(0)
+                        .setBatchIndexErrorMessage("error message")
+                ))
+                .setErrorMessage("global error message")),
+            List.of(),
+            0);
     }
 
     private SaslHandshakeRequest createSaslHandshakeRequest(short version) {
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index fae1c857654..ced4770f032 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -26,12 +26,12 @@ import kafka.utils.Logging
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.ListTransactionsResponseData
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.stats.{Avg, Max}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.record.internal.{FileRecords, MemoryRecords, 
MemoryRecordsBuilder, Record, SimpleRecord}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicIdPartition, 
TopicPartition}
@@ -259,7 +259,7 @@ class TransactionStateManager(brokerId: Int,
     expiredForPartition: Iterable[TransactionalIdCoordinatorEpochAndMetadata],
     tombstoneRecords: MemoryRecords
   ): Unit = {
-    def removeFromCacheCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+    def removeFromCacheCallback(responses: util.Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       responses.forEach { (topicPartition, response) =>
         inReadLock[Exception](stateLock, () => {
           transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
@@ -270,11 +270,11 @@ class TransactionStateManager(brokerId: Int,
                 if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
                   && txnMetadata.pendingState.filter(s => s == 
TransactionState.DEAD).isPresent
                   && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                  && response.error == Errors.NONE) {
+                  && response.errorCode == Errors.NONE.code) {
                   
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
                 } else {
                   warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
-                    s" from cache. Tombstone append error code: 
${response.error}," +
+                    s" from cache. Tombstone append error code: 
${response.errorCode}," +
                     s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
                     s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
                     s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
@@ -671,7 +671,7 @@ class TransactionStateManager(brokerId: Int,
     val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
 
     // set the callback function to update transaction status in cache after 
log append completed
-    def updateCacheCallback(responseStatus: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+    def updateCacheCallback(responseStatus: util.Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       // the append response should only contain the topics partition
       if (responseStatus.size != 1 || 
!responseStatus.containsKey(transactionStateTopicIdPartition))
         throw new IllegalStateException("Append status %s should only have one 
partition %s"
@@ -679,13 +679,13 @@ class TransactionStateManager(brokerId: Int,
 
       val status = responseStatus.get(transactionStateTopicIdPartition)
 
-      var responseError = if (status.error == Errors.NONE) {
+      var responseError = if (status.errorCode == Errors.NONE.code) {
         Errors.NONE
       } else {
-        debug(s"Appending $transactionalId's new metadata $newMetadata failed 
due to ${status.error.exceptionName}")
+        debug(s"Appending $transactionalId's new metadata $newMetadata failed 
due to ${Errors.forCode(status.errorCode).exceptionName}")
 
         // transform the log append error code to the corresponding 
coordinator error code
-        status.error match {
+        Errors.forCode(status.errorCode) match {
           case Errors.UNKNOWN_TOPIC_OR_PARTITION
                | Errors.NOT_ENOUGH_REPLICAS
                | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e923563ad0..9cc68b73142 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -40,6 +40,7 @@ import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset,
 OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
+import org.apache.kafka.common.message.ProduceResponseData.{NodeEndpoint, 
PartitionProduceResponse}
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -48,7 +49,6 @@ import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType._
@@ -79,7 +79,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.stream.Collectors
 import java.util.{Collections, Optional}
-import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
@@ -406,9 +405,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
-    val nonExistingTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
-    val invalidRequestResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, 
PartitionProduceResponse]()
+    val nonExistingTopicResponses = mutable.Map[TopicIdPartition, 
PartitionProduceResponse]()
+    val invalidRequestResponses = mutable.Map[TopicIdPartition, 
PartitionProduceResponse]()
     val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
     val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, 
ProduceRequestData.PartitionProduceData)]
 
@@ -423,7 +422,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         val topicPartition = new TopicPartition(topicName, partition.index())
         // To be compatible with the old version, only return UNKNOWN_TOPIC_ID 
if request version uses topicId, but the corresponding topic name can't be 
found.
         if (topicName.isEmpty && request.header.apiVersion > 12)
-          nonExistingTopicResponses += new TopicIdPartition(topicId, 
topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
+          nonExistingTopicResponses += new TopicIdPartition(topicId, 
topicPartition) ->
+            new PartitionProduceResponse()
+              .setIndex(topicPartition.partition)
+              .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+              .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+              .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
         else
           topicIdToPartitionData += new TopicIdPartition(topicId, 
topicPartition) -> partition
       }
@@ -437,44 +442,61 @@ class KafkaApis(val requestChannel: RequestChannel,
       // https://issues.apache.org/jira/browse/KAFKA-10698
       val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
       if (!authorizedTopics.contains(topicIdPartition.topic))
-        unauthorizedTopicResponses += topicIdPartition -> new 
PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+        unauthorizedTopicResponses += topicIdPartition ->
+  new PartitionProduceResponse()
+    .setIndex(topicIdPartition.partition)
+    .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+    .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+    .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+    .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
       else if (!metadataCache.contains(topicIdPartition.topicPartition))
-        nonExistingTopicResponses += topicIdPartition -> new 
PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        nonExistingTopicResponses += topicIdPartition ->
+          new PartitionProduceResponse()
+            .setIndex(topicIdPartition.partition)
+            .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+            .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+            .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+            .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
       else
         try {
           ProduceRequest.validateRecords(request.header.apiVersion, 
memoryRecords)
           authorizedRequestInfo += (topicIdPartition -> memoryRecords)
         } catch {
           case e: ApiException =>
-            invalidRequestResponses += topicIdPartition -> new 
PartitionResponse(Errors.forException(e))
+            invalidRequestResponses += topicIdPartition ->
+              new PartitionProduceResponse()
+                .setIndex(topicIdPartition.partition)
+                .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+                .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setErrorCode(Errors.forException(e).code)
         }
     }
 
-    // the callback for sending a produce response
-    // The construction of ProduceResponse is able to accept auto-generated 
protocol data so
-    // KafkaApis#handleProduceRequest should apply auto-generated protocol to 
avoid extra conversion.
-    // https://issues.apache.org/jira/browse/KAFKA-10730
-    @nowarn("cat=deprecation")
-    def sendResponseCallback(responseStatus: Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+    def sendResponseCallback(responseStatus: Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses 
++ nonExistingTopicResponses ++ invalidRequestResponses
       var errorInResponse = false
 
-      val nodeEndpoints = new mutable.HashMap[Int, Node]
+      val nodeEndpoints = new mutable.HashMap[Int, NodeEndpoint]
       mergedResponseStatus.foreachEntry { (topicIdPartition, status) =>
-        if (status.error != Errors.NONE) {
+        if (status.errorCode != Errors.NONE.code) {
           errorInResponse = true
           debug("Produce request with correlation id %d from client %s on 
partition %s failed due to %s".format(
             request.header.correlationId,
             request.header.clientId,
             topicIdPartition,
-            status.error.exceptionName))
+            Errors.forCode(status.errorCode).exceptionName))
 
           if (request.header.apiVersion >= 10) {
-            status.error match {
+            Errors.forCode(status.errorCode) match {
               case Errors.NOT_LEADER_OR_FOLLOWER =>
                 val leaderNode = 
getCurrentLeader(topicIdPartition.topicPartition(), 
request.context.listenerName)
                 leaderNode.node.foreach { node =>
-                  nodeEndpoints.put(node.id(), node)
+                  nodeEndpoints.put(node.id, new NodeEndpoint()
+                    .setNodeId(node.id)
+                    .setHost(node.host)
+                    .setPort(node.port)
+                    .setRack(node.rack))
                 }
                 status.currentLeader
                   .setLeaderId(leaderNode.leaderId)
@@ -511,21 +533,21 @@ class KafkaApis(val requestChannel: RequestChannel,
         // the producer client will know that some error has happened and will 
refresh its metadata
         if (errorInResponse) {
           val exceptionsSummary = mergedResponseStatus.map { case 
(topicPartition, status) =>
-            topicPartition -> status.error.exceptionName
+            topicPartition -> Errors.forCode(status.errorCode).exceptionName
           }.mkString(", ")
           info(
             s"Closing connection due to error during produce request with 
correlation id ${request.header.correlationId} " +
               s"from client id ${request.header.clientId} with ack=0\n" +
               s"Topic and partition to exceptions: $exceptionsSummary"
           )
-          requestChannel.closeConnection(request, new 
ProduceResponse(mergedResponseStatus.asJava).errorCounts)
+          requestChannel.closeConnection(request, new 
ProduceResponse(mergedResponseStatus.asJava, util.List.of, 
AbstractResponse.DEFAULT_THROTTLE_TIME).errorCounts)
         } else {
           // Note that although request throttling is exempt for acks == 0, 
the channel may be throttled due to
           // bandwidth quota violation.
           requestHelper.sendNoOpResponseExemptThrottle(request)
         }
       } else {
-        requestChannel.sendResponse(request, new 
ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs, 
nodeEndpoints.values.toList.asJava), None)
+        requestChannel.sendResponse(request, new 
ProduceResponse(mergedResponseStatus.asJava, 
nodeEndpoints.values.toList.asJava, maxThrottleTimeMs), None)
       }
     }
 
@@ -1839,7 +1861,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestLocal = requestLocal,
             responseCallback = errors => {
               errors.forEach { (topicIdPartition, partitionResponse) =>
-                addResultAndMaybeComplete(topicIdPartition.topicPartition(), 
partitionResponse.error)
+                addResultAndMaybeComplete(topicIdPartition.topicPartition(), 
Errors.forCode(partitionResponse.errorCode))
               }
             },
             transactionVersion = markerTransactionVersion
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b109d8d411b..3aa997f97c7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -32,6 +32,7 @@ import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset,
 OffsetForLeaderTopicResult}
 import org.apache.kafka.common.message.{DescribeLogDirsResponseData, 
DescribeProducersResponseData}
+import 
org.apache.kafka.common.message.ProduceResponseData.{BatchIndexAndErrorMessage, 
PartitionProduceResponse}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
@@ -40,7 +41,6 @@ import 
org.apache.kafka.common.replica.PartitionView.DefaultPartitionView
 import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
 import org.apache.kafka.common.replica._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.internals.Exit
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, 
RejectedExecutionException, TimeUnit}
 import java.util.{Collections, Optional, OptionalInt, OptionalLong}
 import java.util.function.Consumer
+import java.util.stream.Collectors
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.jdk.CollectionConverters._
 import scala.jdk.FunctionConverters.enrichAsJavaConsumer
@@ -640,7 +641,7 @@ class ReplicaManager(val config: KafkaConfig,
                     internalTopicsAllowed: Boolean,
                     origin: AppendOrigin,
                     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
-                    responseCallback: util.Map[TopicIdPartition, 
PartitionResponse] => Unit,
+                    responseCallback: util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit,
                     recordValidationStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                     requestLocal: RequestLocal = RequestLocal.noCaching,
                     verificationGuards: Map[TopicPartition, VerificationGuard] 
= Map.empty,
@@ -699,7 +700,7 @@ class ReplicaManager(val config: KafkaConfig,
                           internalTopicsAllowed: Boolean,
                           transactionalId: String,
                           entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],
-                          responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit,
+                          responseCallback: Map[TopicIdPartition, 
PartitionProduceResponse] => Unit,
                           recordValidationStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                           requestLocal: RequestLocal = RequestLocal.noCaching,
                           transactionSupportedOperation: 
TransactionSupportedOperation): Unit = {
@@ -760,7 +761,7 @@ class ReplicaManager(val config: KafkaConfig,
 
       val preAppendPartitionResponses = 
buildProducePartitionStatus(errorResults).map { case (k, status) => k -> 
status.responseStatus }
 
-      def newResponseCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+      def newResponseCallback(responses: util.Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
         responseCallback(preAppendPartitionResponses ++ responses.asScala)
       }
 
@@ -838,14 +839,19 @@ class ReplicaManager(val config: KafkaConfig,
     results.map { case (topicIdPartition, result) =>
       topicIdPartition -> new ProducePartitionStatus(
         result.logAppendSummary.lastOffset + 1, // required offset
-        new PartitionResponse(
-          result.error,
-          result.logAppendSummary.firstOffset,
-          result.logAppendSummary.logAppendTime,
-          result.logAppendSummary.logStartOffset,
-          result.logAppendSummary.recordErrors,
-          result.errorMessage
-        )
+        new PartitionProduceResponse()
+          .setIndex(topicIdPartition.partition)
+          .setErrorCode(result.error.code)
+          .setErrorMessage(result.errorMessage)
+          .setBaseOffset(result.logAppendSummary.firstOffset)
+          .setLogAppendTimeMs(result.logAppendSummary.logAppendTime)
+          .setLogStartOffset(result.logAppendSummary.logStartOffset)
+          .setRecordErrors(result.logAppendSummary.recordErrors
+            .stream()
+            .map(e => new BatchIndexAndErrorMessage()
+              .setBatchIndex(e.batchIndex)
+              .setBatchIndexErrorMessage(e.message))
+            .collect(Collectors.toList()))
       )
     }
   }
@@ -881,7 +887,7 @@ class ReplicaManager(val config: KafkaConfig,
     entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
     initialAppendResults: Map[TopicIdPartition, LogAppendResult],
     initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
-    responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
+    responseCallback: util.Map[TopicIdPartition, PartitionProduceResponse] => 
Unit,
   ): Unit = {
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
       // Create delayed produce operation
@@ -911,7 +917,7 @@ class ReplicaManager(val config: KafkaConfig,
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys.asJava)
     } else {
       // we can respond immediately
-      val produceResponseStatus = new util.HashMap[TopicIdPartition, 
PartitionResponse]
+      val produceResponseStatus = new util.HashMap[TopicIdPartition, 
PartitionProduceResponse]
       initialProduceStatus.foreach { case (k, status) => 
produceResponseStatus.put(k, status.responseStatus) }
       responseCallback(produceResponseStatus)
     }
@@ -919,16 +925,17 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def sendInvalidRequiredAcksResponse(
     entries: Map[TopicIdPartition, MemoryRecords],
-    responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit): 
Unit = {
+    responseCallback: util.Map[TopicIdPartition, PartitionProduceResponse] => 
Unit): Unit = {
     // If required.acks is outside accepted range, something is wrong with the 
client
     // Just return an error and don't handle the request at all
-    val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]
+    val responseStatus = new util.HashMap[TopicIdPartition, 
PartitionProduceResponse]
     entries.foreach { case(topicIdPartition, _) =>
-        responseStatus.put(topicIdPartition, new PartitionResponse(
-          Errors.INVALID_REQUIRED_ACKS,
-          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
-          RecordBatch.NO_TIMESTAMP,
-          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset)
+        responseStatus.put(topicIdPartition, new PartitionProduceResponse()
+          .setIndex(topicIdPartition.partition())
+          .setBaseOffset(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset)
+          
.setLogStartOffset(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset)
+          .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+          .setErrorCode(Errors.INVALID_REQUIRED_ACKS.code())
         )
     }
     responseCallback(responseStatus)
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 71f33100094..d4047f51e9e 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -24,11 +24,11 @@ import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
PartitionChangeRecord, PartitionRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.KRaftMetadataCache
@@ -133,23 +133,23 @@ class LocalLeaderEndPointTest extends Logging {
   @Test
   def testFetchLatestOffset(): Unit = {
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     assertEquals(new OffsetAndEpoch(3L, 0), 
endPoint.fetchLatestOffset(topicPartition, 0))
     bumpLeaderEpoch()
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     assertEquals(new OffsetAndEpoch(6L, 1), 
endPoint.fetchLatestOffset(topicPartition, 7))
   }
 
   @Test
   def testFetchEarliestOffset(): Unit = {
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     assertEquals(new OffsetAndEpoch(0L, 0), 
endPoint.fetchEarliestOffset(topicPartition, 0))
 
     bumpLeaderEpoch()
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ 
=> ())
     assertEquals(new OffsetAndEpoch(3L, 1), 
endPoint.fetchEarliestOffset(topicPartition, 7))
   }
@@ -157,12 +157,12 @@ class LocalLeaderEndPointTest extends Logging {
   @Test
   def testFetchEarliestLocalOffset(): Unit = {
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     assertEquals(new OffsetAndEpoch(0L, 0), 
endPoint.fetchEarliestLocalOffset(topicPartition, 0))
 
     bumpLeaderEpoch()
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     
replicaManager.logManager.getLog(topicPartition).ifPresent(_.updateLocalLogStartOffset(3))
     assertEquals(new OffsetAndEpoch(0L, 0), 
endPoint.fetchEarliestOffset(topicPartition, 7))
     assertEquals(new OffsetAndEpoch(3L, 1), 
endPoint.fetchEarliestLocalOffset(topicPartition, 7))
@@ -171,7 +171,7 @@ class LocalLeaderEndPointTest extends Logging {
   @Test
   def testFetchEpochEndOffsets(): Unit = {
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
 
     var result = endPoint.fetchEpochEndOffsets(JMap.of(
       topicPartition, new OffsetForLeaderPartition()
@@ -195,7 +195,7 @@ class LocalLeaderEndPointTest extends Logging {
     assertEquals(2, 
replicaManager.getPartitionOrException(topicPartition).getLeaderEpoch)
 
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
 
     result = endPoint.fetchEpochEndOffsets(JMap.of(
       topicPartition, new OffsetForLeaderPartition()
@@ -251,7 +251,7 @@ class LocalLeaderEndPointTest extends Logging {
   def testEarliestPendingUploadOffsetWhenNoSegmentsUploaded(): Unit = {
     // Append some records; no remote upload happened yet
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
 
     val expected = endPoint.fetchEarliestOffset(topicPartition, 0)
     val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
@@ -261,7 +261,7 @@ class LocalLeaderEndPointTest extends Logging {
   @Test
   def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStart(): Unit = {
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
 
     // Bump epoch and advance local log start offset without changing log 
start offset
     bumpLeaderEpoch()
@@ -274,7 +274,7 @@ class LocalLeaderEndPointTest extends Logging {
   @Test
   def testEarliestPendingUploadOffsetWhenHighestRemoteOffsetKnown(): Unit = {
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
 
     // Highest remote is 1 => earliest pending should be max(1+1, logStart)
     val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
@@ -291,7 +291,7 @@ class LocalLeaderEndPointTest extends Logging {
   def 
testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStartWithKnownRemoteOffset():
 Unit = {
     // Append records to create initial log state
     appendRecords(replicaManager, topicIdPartition, records)
-      .onFire(response => assertEquals(Errors.NONE, response.error))
+      .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
 
     val log = 
replicaManager.getPartitionOrException(topicPartition).localLogOrException
 
@@ -315,7 +315,7 @@ class LocalLeaderEndPointTest extends Logging {
     // Append 12 records (offsets 0-11)
     for (_ <- 1 to 4) {
       appendRecords(replicaManager, topicIdPartition, records)
-        .onFire(response => assertEquals(Errors.NONE, response.error))
+        .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     }
 
     // Delete records to advance logStartOffset to 10
@@ -342,7 +342,7 @@ class LocalLeaderEndPointTest extends Logging {
     // Append 18 records (offsets 0-17)
     for (_ <- 1 to 6) {
       appendRecords(replicaManager, topicIdPartition, records)
-        .onFire(response => assertEquals(Errors.NONE, response.error))
+        .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     }
 
     // Delete records to advance logStartOffset to 10
@@ -369,7 +369,7 @@ class LocalLeaderEndPointTest extends Logging {
     // Append 12 records (offsets 0-11)
     for (_ <- 1 to 4) {
       appendRecords(replicaManager, topicIdPartition, records)
-        .onFire(response => assertEquals(Errors.NONE, response.error))
+        .onFire(response => assertEquals(Errors.NONE.code, response.errorCode))
     }
 
     // Delete records to advance both logStartOffset and localLogStartOffset 
to 10
@@ -430,9 +430,9 @@ class LocalLeaderEndPointTest extends Logging {
                             partition: TopicIdPartition,
                             records: MemoryRecords,
                             origin: AppendOrigin = AppendOrigin.CLIENT,
-                            requiredAcks: Short = -1): 
CallbackResult[PartitionResponse] = {
-    val result = new CallbackResult[PartitionResponse]()
-    def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]): 
Unit = {
+                            requiredAcks: Short = -1): 
CallbackResult[PartitionProduceResponse] = {
+    val result = new CallbackResult[PartitionProduceResponse]()
+    def appendCallback(responses: JMap[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       val response = responses.get(partition)
       assertNotNull(response)
       result.fire(response)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index abb3f6d4601..e5d1e5cb5d5 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -26,10 +26,10 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
 import kafka.utils._
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.purgatory.DelayedProduce.ProducePartitionStatus
@@ -216,7 +216,7 @@ object AbstractCoordinatorConcurrencyTest {
                                internalTopicsAllowed: Boolean,
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],
-                               responseCallback: 
java.util.Map[TopicIdPartition, PartitionResponse] => Unit,
+                               responseCallback: 
java.util.Map[TopicIdPartition, PartitionProduceResponse] => Unit,
                                processingStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                                requestLocal: RequestLocal = 
RequestLocal.noCaching,
                                verificationGuards: Map[TopicPartition, 
VerificationGuard] = Map.empty,
@@ -226,7 +226,12 @@ object AbstractCoordinatorConcurrencyTest {
         return
       val produceStatus = entriesPerPartition.map {
         case (tp, _) =>
-          (tp, new ProducePartitionStatus(0L, new 
PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+          (tp, new ProducePartitionStatus(0L, new PartitionProduceResponse()
+            .setIndex(tp.partition)
+            .setErrorCode(Errors.NONE.code)
+            .setBaseOffset(0)
+            .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+            .setLogStartOffset(0)))
       }.asJava
 
       // It is safe to set the third parameter to null because it is only used 
in tryComplete().
@@ -243,7 +248,12 @@ object AbstractCoordinatorConcurrencyTest {
         override def onComplete(): Unit = {
           responseCallback(entriesPerPartition.map {
             case (tp, _) =>
-              (tp, new PartitionResponse(Errors.NONE, 0L, 
RecordBatch.NO_TIMESTAMP, 0L))
+              (tp, new PartitionProduceResponse()
+                .setIndex(tp.partition)
+                .setErrorCode(Errors.NONE.code)
+                .setBaseOffset(0)
+                .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                .setLogStartOffset(0))
           }.asJava)
         }
       }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 84c58f40c87..18e97da79c1 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -26,11 +26,11 @@ import org.apache.kafka.common.{TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.errors.InvalidRegularExpression
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, 
Metrics}
 import org.apache.kafka.common.protocol.{Errors, MessageUtil}
 import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.coordinator.transaction.{TransactionLog, 
TransactionMetadata, TransactionState, TxnTransitMetadata}
@@ -1106,7 +1106,7 @@ class TransactionStateManagerTest {
     capturedAppends: mutable.Map[TopicIdPartition, 
mutable.Buffer[MemoryRecords]]
   ): Unit = {
     val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
-    val callbackCapture: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] 
=> Unit])
+    val callbackCapture: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit])
 
     when(replicaManager.appendRecords(
       anyLong(),
@@ -1129,7 +1129,12 @@ class TransactionStateManagerTest {
 
         batches += records
 
-        topicPartition -> new PartitionResponse(appendError, 0L, 
RecordBatch.NO_TIMESTAMP, 0L)
+        topicPartition -> new PartitionProduceResponse()
+          .setIndex(topicPartition.partition)
+          .setErrorCode(appendError.code)
+          .setBaseOffset(0)
+          .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+          .setLogStartOffset(0)
       }.toMap.asJava
     ))
   }
@@ -1261,7 +1266,7 @@ class TransactionStateManagerTest {
   private def prepareForTxnMessageAppend(error: Errors): Unit = {
     reset(replicaManager)
 
-    val capturedArgument: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] 
=> Unit])
+    val capturedArgument: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit])
     when(replicaManager.appendRecords(anyLong(),
       anyShort(),
       internalTopicsAllowed = ArgumentMatchers.eq(true),
@@ -1274,7 +1279,12 @@ class TransactionStateManagerTest {
       any()
     )).thenAnswer(_ => capturedArgument.getValue.apply(
       util.Map.of(new TopicIdPartition(transactionTopicId, partitionId, 
TRANSACTION_STATE_TOPIC_NAME),
-              new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+        new PartitionProduceResponse()
+          .setIndex(partitionId)
+          .setErrorCode(error.code)
+          .setBaseOffset(0)
+          .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+          .setLogStartOffset(0)))
     )
     when(replicaManager.topicIdPartition(new 
TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 0))).thenReturn(new 
TopicIdPartition(transactionTopicId, 0, TRANSACTION_STATE_TOPIC_NAME))
     when(replicaManager.topicIdPartition(new 
TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 1))).thenReturn(new 
TopicIdPartition(transactionTopicId, 1, TRANSACTION_STATE_TOPIC_NAME))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3fdcdaf3e49..4aa7b883050 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -56,6 +56,7 @@ import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
 OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
 OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
OffsetDeleteResponseTopicCollection}
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import 
org.apache.kafka.common.message.ShareFetchRequestData.{AcknowledgementBatch, 
ForgottenTopic}
 import 
org.apache.kafka.common.message.ShareFetchResponseData.{AcquiredRecords, 
PartitionData, ShareFetchableTopicResponse}
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, 
RegisterBrokerRecord, TopicRecord}
@@ -69,7 +70,6 @@ import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
 import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
@@ -2448,7 +2448,7 @@ class KafkaApisTest extends Logging {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] 
=> Unit])
 
       val produceData = new ProduceRequestData.TopicProduceData()
         .setPartitionData(util.List.of(
@@ -2480,7 +2480,12 @@ class KafkaApisTest extends Logging {
         any(),
         any(),
         any()
-      )).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
+      )).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionProduceResponse()
+        .setIndex(tp.partition)
+        .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+        .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code))))
 
       
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
         any[Long])).thenReturn(0)
@@ -2839,7 +2844,7 @@ class KafkaApisTest extends Logging {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] 
=> Unit])
 
       val tp = new TopicIdPartition(topicId, 0, topic)
       val partition = mock(classOf[Partition])
@@ -2874,7 +2879,12 @@ class KafkaApisTest extends Logging {
         any(),
         any(),
         any())
-      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionProduceResponse()
+        .setIndex(tp.partition)
+        .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+        .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code))))
 
       
when(replicaManager.getPartitionOrError(tp.topicPartition())).thenAnswer(_ => 
Right(partition))
       when(partition.leaderReplicaIdOpt).thenAnswer(_ => Some(newLeaderId))
@@ -2913,7 +2923,7 @@ class KafkaApisTest extends Logging {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] 
=> Unit])
 
       val tp = new TopicIdPartition(topicId, 0, topic)
 
@@ -2945,7 +2955,12 @@ class KafkaApisTest extends Logging {
         any(),
         any(),
         any())
-      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionProduceResponse()
+        .setIndex(tp.partition)
+        .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+        .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code))))
 
       
when(replicaManager.getPartitionOrError(tp.topicPartition())).thenAnswer(_ => 
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
@@ -2983,7 +2998,7 @@ class KafkaApisTest extends Logging {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
+      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] 
=> Unit])
 
       val tp = new TopicIdPartition(topicId, 0, topic)
 
@@ -3017,7 +3032,12 @@ class KafkaApisTest extends Logging {
         any(),
         any(),
         any())
-      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionProduceResponse()
+        .setIndex(tp.partition)
+        .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+        .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+        .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code))))
 
       when(replicaManager.getPartitionOrError(tp.topicPartition)).thenAnswer(_ 
=> Left(Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
@@ -3223,7 +3243,7 @@ class KafkaApisTest extends Logging {
     val expectedErrors = util.Map.of(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION, 
tp2, Errors.NONE)
 
     val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = 
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
-    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] 
=> Unit])
+    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit])
 
     when(replicaManager.onlinePartition(tp1))
       .thenReturn(None)
@@ -3241,7 +3261,12 @@ class KafkaApisTest extends Logging {
       ArgumentMatchers.eq(requestLocal),
       any(),
       any()
-    )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new 
TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE))))
+    )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new 
TopicIdPartition(topicId,tp2), new PartitionProduceResponse()
+      .setIndex(tp2.partition)
+      .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+      .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+      .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+      .setErrorCode(Errors.NONE.code))))
     kafkaApis = createKafkaApis()
     kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal)
     verify(requestChannel).sendResponse(
@@ -3364,8 +3389,8 @@ class KafkaApisTest extends Logging {
 
     val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, 
MemoryRecords]] =
       ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
-    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] =
-      ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionResponse] => Unit])
+    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit])
 
     when(replicaManager.appendRecords(
       ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
@@ -3381,7 +3406,12 @@ class KafkaApisTest extends Logging {
     )).thenAnswer { _ =>
       responseCallback.getValue.apply(
         entriesPerPartition.getValue.keySet.map { tp =>
-          tp -> new PartitionResponse(Errors.NONE)
+          tp -> new PartitionProduceResponse()
+            .setIndex(tp.partition)
+            .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+            .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+            .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+            .setErrorCode(Errors.NONE.code)
         }.toMap.asJava
       )
     }
@@ -3535,8 +3565,8 @@ class KafkaApisTest extends Logging {
     // Set up appendRecords to simulate epoch validation failure
     val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, 
MemoryRecords]] =
       ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
-    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionResponse] => Unit] =
-      ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionResponse] => Unit])
+    val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, 
PartitionProduceResponse] => Unit])
     
     when(replicaManager.appendRecords(
       ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
@@ -3553,7 +3583,12 @@ class KafkaApisTest extends Logging {
       // Simulate epoch validation failure by calling callback with 
INVALID_PRODUCER_EPOCH error
       val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
       responseCallback.getValue.apply(
-        util.Map.of(topicIdPartition, new 
PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))
+        util.Map.of(topicIdPartition, new PartitionProduceResponse()
+          .setIndex(topicIdPartition.partition)
+          .setBaseOffset(ProduceResponse.INVALID_OFFSET)
+          .setLogStartOffset(ProduceResponse.INVALID_OFFSET)
+          .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+          .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code))
       )
     }
     
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0ed1a999d71..e1aa429ddb5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -26,12 +26,13 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.internal.SimpleRecord
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
-import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
+import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, 
Uuid}
@@ -295,10 +296,10 @@ class ReplicaManagerConcurrencyTest extends Logging {
         new SimpleRecord(s"$clientId-${sequence + i}".getBytes)
       }
 
-      val future = new CompletableFuture[ProduceResponse.PartitionResponse]()
+      val future = new CompletableFuture[PartitionProduceResponse]()
       val topicIdPartition: common.TopicIdPartition = 
replicaManager.topicIdPartition(topicPartition)
 
-      def produceCallback(results: util.Map[common.TopicIdPartition, 
ProduceResponse.PartitionResponse]): Unit = {
+      def produceCallback(results: util.Map[common.TopicIdPartition, 
PartitionProduceResponse]): Unit = {
         try {
           assertEquals(1, results.size)
 
@@ -307,7 +308,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
           val result = entry.getValue
 
           assertEquals(topicIdPartition, topicPartition)
-          assertEquals(Errors.NONE, result.error)
+          assertEquals(Errors.NONE.code, result.errorCode)
           future.complete(result)
         } catch {
           case e: Throwable => future.completeExceptionally(e)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9639d1e46aa..909b8dad4b0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -36,6 +36,7 @@ import 
org.apache.kafka.common.errors.InvalidPidMappingException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.{DeleteRecordsResponseData, 
FetchResponseData, ShareFetchResponseData}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
PartitionRecord, RemoveTopicRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.Monitorable
@@ -47,7 +48,6 @@ import 
org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
 import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, 
ReplicaSelector, ReplicaView}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
@@ -263,8 +263,8 @@ class ReplicaManagerTest {
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager)
     try {
-      def callback(responseStatus: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
-        assert(responseStatus.values().iterator().next().error == 
Errors.INVALID_REQUIRED_ACKS)
+      def callback(responseStatus: util.Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
+        assert(responseStatus.values().iterator().next().errorCode == 
Errors.INVALID_REQUIRED_ACKS.code)
       }
       rm.appendRecords(
         timeout = 0,
@@ -436,7 +436,7 @@ class ReplicaManagerTest {
 
       val records = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("first message".getBytes()))
       val appendResult = appendRecords(rm, new TopicPartition(topic, 0), 
records).onFire { response =>
-        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error)
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, response.errorCode)
       }
 
       // Make this replica the follower
@@ -571,7 +571,7 @@ class ReplicaManagerTest {
         val records = MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
         appendRecords(replicaManager, new TopicPartition(topic, 0), 
records).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -583,7 +583,7 @@ class ReplicaManagerTest {
       val record = MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId, epoch, outOfRangeSequence,
         new SimpleRecord(s"message: $outOfRangeSequence".getBytes))
       appendRecords(replicaManager, new TopicPartition(topic, 0), 
record).onFire { response =>
-        assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, response.error)
+        assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.code, 
response.errorCode)
         assertEquals(0, response.logStartOffset)
       }
 
@@ -628,7 +628,7 @@ class ReplicaManagerTest {
         val records = MemoryRecords.withIdempotentRecords(Compression.NONE, 
pid, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
         appendRecords(replicaManager, new TopicPartition(topic, partition), 
records).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -705,7 +705,7 @@ class ReplicaManagerTest {
       val records = MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, epoch, sequence,
         new SimpleRecord(time.milliseconds(), s"message $sequence".getBytes))
       handleProduceAppend(replicaManager, new TopicPartition(topic, 0), 
records, transactionalId = transactionalId).onFire { response =>
-        assertEquals(Errors.NONE, response.error)
+        assertEquals(Errors.NONE.code, response.errorCode)
       }
       assertLateTransactionCount(Some(0))
 
@@ -720,7 +720,7 @@ class ReplicaManagerTest {
       val abortRecordBatch = 
MemoryRecords.withEndTransactionMarker(producerId, epoch, abortTxnMarker)
       appendRecords(replicaManager, new TopicPartition(topic, 0),
         abortRecordBatch, origin = AppendOrigin.COORDINATOR, 
transactionVersion = TransactionVersion.TV_0.featureLevel()).onFire { response 
=>
-        assertEquals(Errors.NONE, response.error)
+        assertEquals(Errors.NONE.code, response.errorCode)
       }
       assertLateTransactionCount(Some(0))
     } finally {
@@ -760,7 +760,7 @@ class ReplicaManagerTest {
         val records = MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
         handleProduceAppend(replicaManager, new TopicPartition(topic, 0), 
records, transactionalId = transactionalId).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -805,7 +805,7 @@ class ReplicaManagerTest {
       val commitRecordBatch = 
MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
       appendRecords(replicaManager, new TopicPartition(topic, 0), 
commitRecordBatch,
         origin = AppendOrigin.COORDINATOR, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-        .onFire { response => assertEquals(Errors.NONE, response.error) }
+        .onFire { response => assertEquals(Errors.NONE.code, 
response.errorCode) }
 
       // the LSO has advanced, but the appended commit marker has not been 
replicated, so
       // none of the data from the transaction should be visible yet
@@ -871,7 +871,7 @@ class ReplicaManagerTest {
         val records = MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, epoch, sequence,
           new SimpleRecord(s"message $sequence".getBytes))
         handleProduceAppend(replicaManager, new TopicPartition(topic, 0), 
records, transactionalId = transactionalId).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -880,7 +880,7 @@ class ReplicaManagerTest {
       val abortRecordBatch = 
MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
       appendRecords(replicaManager, new TopicPartition(topic, 0), 
abortRecordBatch,
         origin = AppendOrigin.COORDINATOR, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-        .onFire { response => assertEquals(Errors.NONE, response.error) }
+        .onFire { response => assertEquals(Errors.NONE.code, 
response.errorCode) }
 
       // fetch as follower to advance the high watermark
       fetchPartitionAsFollower(
@@ -942,7 +942,7 @@ class ReplicaManagerTest {
       for (i <- 1 to 2) {
         val records = TestUtils.singletonRecords(s"message $i".getBytes)
         appendRecords(rm, new TopicPartition(topic, 0), records).onFire { 
response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -999,7 +999,7 @@ class ReplicaManagerTest {
       // Leader appends some data
       for (i <- 1 to 5) {
         appendRecords(replicaManager, tp, TestUtils.singletonRecords(s"message 
$i".getBytes)).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -1148,10 +1148,10 @@ class ReplicaManagerTest {
       // Append a couple of messages.
       for (i <- 1 to 2) {
         appendRecords(replicaManager, tp0, 
TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
         appendRecords(replicaManager, tp1, 
TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
@@ -1419,7 +1419,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
 
       appendRecords(replicaManager, tp0, 
TestUtils.singletonRecords(s"message".getBytes)).onFire { response =>
-        assertEquals(Errors.NONE, response.error)
+        assertEquals(Errors.NONE.code, response.errorCode)
       }
       // Fetch as follower to initialise the log end offset of the replica
       fetchPartitionAsFollower(
@@ -1897,7 +1897,7 @@ class ReplicaManagerTest {
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
       callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE))
-      assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
+      assertEquals(Errors.INVALID_TXN_STATE.code, result.assertFired.errorCode)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
       // This time verification is successful.
@@ -1970,7 +1970,7 @@ class ReplicaManagerTest {
 
       if (error != Errors.CONCURRENT_TRANSACTIONS) {
         // NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS
-        assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, 
result.assertFired.errorCode)
       } else {
         // The append should not finish with error, it should retry later.
         assertFalse(result.hasFired)
@@ -2033,7 +2033,7 @@ class ReplicaManagerTest {
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
       callback.complete(util.Map.of(tp0, Errors.INVALID_PRODUCER_ID_MAPPING))
-      assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, 
result.assertFired.error)
+      assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING.code, 
result.assertFired.errorCode)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
       // Try to append a higher sequence (1) after the first one failed with a 
retriable error.
@@ -2056,7 +2056,7 @@ class ReplicaManagerTest {
       val callback2: AddPartitionsToTxnManager.AppendCallback = 
appendCallback2.getValue
       callback2.complete(util.Map.of())
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
-      assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 
result2.assertFired.error)
+      assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.code, 
result2.assertFired.errorCode)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -2126,7 +2126,7 @@ class ReplicaManagerTest {
       verifyNoMoreInteractions(addPartitionsToTxnManager)
 
       // broker returns the fencing error
-      assertEquals(Errors.INVALID_PRODUCER_EPOCH, result2.assertFired.error)
+      assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, 
result2.assertFired.errorCode)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -2154,7 +2154,7 @@ class ReplicaManagerTest {
 
       handleProduceAppendToMultipleTopics(replicaManager, Map(tp0 -> 
transactionalRecords, tp1 -> transactionalRecords), transactionalId).onFire { 
responses =>
         responses.foreach {
-          entry => assertEquals(Errors.NONE, entry._2.error)
+          entry => assertEquals(Errors.NONE.code, entry._2.errorCode)
         }
       }
     } finally {
@@ -2213,7 +2213,7 @@ class ReplicaManagerTest {
 
       // We should not add these partitions to the manager to verify, but 
instead throw an error.
       handleProduceAppend(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId).onFire { response =>
-        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error)
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, response.errorCode)
       }
       verify(addPartitionsToTxnManager, 
times(0)).addOrVerifyTransaction(any(), any(), any(), any(), any(), any())
     } finally {
@@ -2245,7 +2245,7 @@ class ReplicaManagerTest {
       val transactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence,
         new SimpleRecord(s"message $sequence".getBytes))
       handleProduceAppend(replicaManager, tp, transactionalRecords, 
transactionalId = transactionalId).onFire { response =>
-        assertEquals(Errors.NONE, response.error)
+        assertEquals(Errors.NONE.code, response.errorCode)
       }
       assertEquals(VerificationGuard.SENTINEL, 
getVerificationGuard(replicaManager, tp, producerId))
 
@@ -2317,7 +2317,7 @@ class ReplicaManagerTest {
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
       callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE))
-      assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
+      assertEquals(Errors.INVALID_TXN_STATE.code, result.assertFired.errorCode)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
       // This time we do not verify
@@ -2372,7 +2372,7 @@ class ReplicaManagerTest {
       // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
       callback.complete(util.Map.of(tp0, error))
-      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, 
result.assertFired.errorCode)
       assertEquals(expectedMessage, result.assertFired.errorMessage)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2422,7 +2422,7 @@ class ReplicaManagerTest {
       // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
       callback.complete(util.Map.of(tp0, error))
-      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, 
result.assertFired.errorCode)
       assertEquals(expectedMessage, result.assertFired.errorMessage)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -2459,9 +2459,9 @@ class ReplicaManagerTest {
     replicaManager: ReplicaManager,
     topicPartition: TopicIdPartition,
     numOfRecords: Int
-  ): AtomicReference[PartitionResponse] = {
-    val produceResult = new AtomicReference[PartitionResponse]()
-    def callback(response: util.Map[TopicIdPartition, PartitionResponse]): 
Unit = {
+  ): AtomicReference[PartitionProduceResponse] = {
+    val produceResult = new AtomicReference[PartitionProduceResponse]()
+    def callback(response: util.Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       produceResult.set(response.get(topicPartition))
     }
 
@@ -2705,10 +2705,10 @@ class ReplicaManagerTest {
                             records: MemoryRecords,
                             origin: AppendOrigin = AppendOrigin.CLIENT,
                             requiredAcks: Short = -1,
-                            transactionVersion: Short = 
TransactionVersion.TV_UNKNOWN): CallbackResult[PartitionResponse] = {
-    val result = new CallbackResult[PartitionResponse]()
+                            transactionVersion: Short = 
TransactionVersion.TV_UNKNOWN): CallbackResult[PartitionProduceResponse] = {
+    val result = new CallbackResult[PartitionProduceResponse]()
     val topicIdPartition = new TopicIdPartition(topicId, partition)
-    def appendCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+    def appendCallback(responses: util.Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       val response = responses.get(topicIdPartition)
       assertNotNull(response)
       result.fire(response)
@@ -2732,9 +2732,9 @@ class ReplicaManagerTest {
                                                   transactionalId: String,
                                                   requiredAcks: Short = -1,
                                                   
transactionSupportedOperation: TransactionSupportedOperation = 
GENERIC_ERROR_SUPPORTED
-                                                 ): 
CallbackResult[Map[TopicIdPartition, PartitionResponse]] = {
-    val result = new CallbackResult[Map[TopicIdPartition, PartitionResponse]]()
-    def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
+                                                 ): 
CallbackResult[Map[TopicIdPartition, PartitionProduceResponse]] = {
+    val result = new CallbackResult[Map[TopicIdPartition, 
PartitionProduceResponse]]()
+    def appendCallback(responses: Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       responses.foreach( response => 
assertTrue(responses.get(response._1).isDefined))
       result.fire(responses)
     }
@@ -2759,11 +2759,11 @@ class ReplicaManagerTest {
                                   requiredAcks: Short = -1,
                                   transactionalId: String,
                                   transactionSupportedOperation: 
TransactionSupportedOperation = GENERIC_ERROR_SUPPORTED
-                                 ): CallbackResult[PartitionResponse] = {
-    val result = new CallbackResult[PartitionResponse]()
+                                 ): CallbackResult[PartitionProduceResponse] = 
{
+    val result = new CallbackResult[PartitionProduceResponse]()
 
     val topicIdPartition = new 
TopicIdPartition(topicIds.get(partition.topic()).getOrElse(Uuid.ZERO_UUID), 
partition)
-    def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
+    def appendCallback(responses: Map[TopicIdPartition, 
PartitionProduceResponse]): Unit = {
       val response = responses.get(topicIdPartition)
       assertTrue(response.isDefined)
       result.fire(response.get)
@@ -4521,7 +4521,7 @@ class ReplicaManagerTest {
         new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, 
Optional.empty()),
         replicaId = otherId
       )
-      assertEquals(Errors.NONE, leaderResponse.get.error)
+      assertEquals(Errors.NONE.code, leaderResponse.get.errorCode)
 
       // Change the local replica to follower
       val followerTopicsDelta = 
topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
@@ -4530,7 +4530,7 @@ class ReplicaManagerTest {
 
       // Append on a follower should fail
       val followerResponse = sendProducerAppend(replicaManager, 
topicIdPartition, numOfRecords)
-      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, 
followerResponse.get.errorCode)
 
       // Check the state of that partition and fetcher
       val followerPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
@@ -4582,7 +4582,7 @@ class ReplicaManagerTest {
       val followerResponse = sendProducerAppend(replicaManager,
         new 
TopicIdPartition(followerMetadataImage.topics().topicsByName().get("foo").id, 
topicPartition),
         numOfRecords)
-      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, 
followerResponse.get.errorCode)
 
       // Change the local replica to leader
       val leaderTopicsDelta = 
topicsChangeDelta(followerMetadataImage.topics(), localId, true)
@@ -4598,7 +4598,7 @@ class ReplicaManagerTest {
         new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, 
Optional.empty()),
         replicaId = otherId
       )
-      assertEquals(Errors.NONE, leaderResponse.get.error)
+      assertEquals(Errors.NONE.code, leaderResponse.get.errorCode)
 
       val leaderPartition = 
getOnlinePartition(replicaManager.getPartition(topicPartition))
       assertTrue(leaderPartition.isLeader)
@@ -4898,7 +4898,7 @@ class ReplicaManagerTest {
       }
 
       // Check that the produce failed because it changed to follower before 
replicating
-      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, 
leaderResponse.get.errorCode)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -5397,7 +5397,7 @@ class ReplicaManagerTest {
       // Leader appends some data
       for (i <- 1 to 5) {
         appendRecords(replicaManager, tp, TestUtils.singletonRecords(s"message 
$i".getBytes)).onFire { response =>
-          assertEquals(Errors.NONE, response.error)
+          assertEquals(Errors.NONE.code, response.errorCode)
         }
       }
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java
index f8aa623cf4b..9d38d6a53bc 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java
@@ -19,6 +19,7 @@ package org.apache.kafka.jmh.producer;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ProduceResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
@@ -34,6 +35,7 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
 
 import java.util.AbstractMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -50,17 +52,18 @@ public class ProducerResponseBenchmark {
     private static final int NUMBER_OF_RECORDS = 3;
     private static final Uuid TOPIC_ID = Uuid.randomUuid();
     private static final String TOPIC_NAME = "tp";
-    private static final Map<TopicIdPartition, 
ProduceResponse.PartitionResponse> PARTITION_RESPONSE_MAP = IntStream.range(0, 
NUMBER_OF_PARTITIONS)
+    private static final Map<TopicIdPartition, 
ProduceResponseData.PartitionProduceResponse> PARTITION_RESPONSE_MAP = 
IntStream.range(0, NUMBER_OF_PARTITIONS)
         .mapToObj(partitionIndex -> new AbstractMap.SimpleEntry<>(
             new TopicIdPartition(TOPIC_ID, partitionIndex, TOPIC_NAME),
-            new ProduceResponse.PartitionResponse(
-                Errors.NONE,
-                0,
-                0,
-                0,
-                IntStream.range(0, NUMBER_OF_RECORDS)
-                    .mapToObj(ProduceResponse.RecordError::new)
-                    .toList())
+            new ProduceResponseData.PartitionProduceResponse()
+                .setIndex(partitionIndex)
+                .setBaseOffset(0)
+                .setLogAppendTimeMs(0)
+                .setLogStartOffset(0)
+                .setErrorCode(Errors.NONE.code())
+                .setRecordErrors(IntStream.range(0, NUMBER_OF_RECORDS)
+                    .mapToObj(i -> new 
ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(i))
+                    .collect(Collectors.toList()))
         ))
         .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, 
AbstractMap.SimpleEntry::getValue));
 
@@ -70,7 +73,7 @@ public class ProducerResponseBenchmark {
      */
     @SuppressWarnings("deprecation")
     private static ProduceResponse response() {
-        return new ProduceResponse(PARTITION_RESPONSE_MAP);
+        return new ProduceResponse(PARTITION_RESPONSE_MAP, List.of(), 
AbstractResponse.DEFAULT_THROTTLE_TIME);
     }
 
     private static final ProduceResponse RESPONSE = response();
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
index b7ce44686d1..6d776d3fcfb 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
@@ -18,9 +18,9 @@ package org.apache.kafka.server.purgatory;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse;
 import org.apache.kafka.common.metrics.internals.MetricsUtils;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 
 import com.yammer.metrics.core.Meter;
@@ -50,16 +50,16 @@ public class DelayedProduce extends DelayedOperation {
 
     public static final class ProducePartitionStatus {
         private final long requiredOffset;
-        private final PartitionResponse responseStatus;
+        private final PartitionProduceResponse responseStatus;
 
         private volatile boolean acksPending;
 
-        public ProducePartitionStatus(long requiredOffset, PartitionResponse 
responseStatus) {
+        public ProducePartitionStatus(long requiredOffset, 
PartitionProduceResponse responseStatus) {
             this.requiredOffset = requiredOffset;
             this.responseStatus = responseStatus;
         }
 
-        public PartitionResponse responseStatus() {
+        public PartitionProduceResponse responseStatus() {
             return responseStatus;
         }
 
@@ -72,8 +72,8 @@ public class DelayedProduce extends DelayedOperation {
             return String.format(
                     "[acksPending: %s, error: %s, startOffset: %s, 
requiredOffset: %d]",
                     acksPending,
-                    responseStatus.error.code(),
-                    responseStatus.baseOffset,
+                    responseStatus.errorCode(),
+                    responseStatus.baseOffset(),
                     requiredOffset
             );
         }
@@ -95,12 +95,12 @@ public class DelayedProduce extends DelayedOperation {
 
     private final Map<TopicIdPartition, ProducePartitionStatus> produceStatus;
     private final PartitionStatusValidator statusValidator;
-    private final Consumer<Map<TopicIdPartition, PartitionResponse>> 
responseCallback;
+    private final Consumer<Map<TopicIdPartition, PartitionProduceResponse>> 
responseCallback;
 
     public DelayedProduce(long delayMs,
                           Map<TopicIdPartition, ProducePartitionStatus> 
produceStatus,
                           PartitionStatusValidator statusValidator,
-                          Consumer<Map<TopicIdPartition, PartitionResponse>> 
responseCallback) {
+                          Consumer<Map<TopicIdPartition, 
PartitionProduceResponse>> responseCallback) {
         super(delayMs);
 
         this.produceStatus = produceStatus;
@@ -109,10 +109,10 @@ public class DelayedProduce extends DelayedOperation {
 
         // first update the acks pending variable according to the error code
         produceStatus.forEach((topicPartition, status) -> {
-            if (status.responseStatus.error == Errors.NONE) {
+            if (status.responseStatus.errorCode() == Errors.NONE.code()) {
                 // Timeout error state will be cleared when required acks are 
received
                 status.acksPending = true;
-                status.responseStatus.error = Errors.REQUEST_TIMED_OUT;
+                
status.responseStatus.setErrorCode(Errors.REQUEST_TIMED_OUT.code());
             } else {
                 status.acksPending = false;
             }
@@ -151,7 +151,7 @@ public class DelayedProduce extends DelayedOperation {
                 Errors errors = result.error;
                 if (errors != Errors.NONE || result.hasEnough()) {
                     status.setAcksPending(false);
-                    status.responseStatus.error = errors;
+                    status.responseStatus.setErrorCode(errors.code());
                 }
             }
         });
@@ -186,7 +186,7 @@ public class DelayedProduce extends DelayedOperation {
      */
     @Override
     public void onComplete() {
-        Map<TopicIdPartition, PartitionResponse> responseStatus = new 
HashMap<>();
+        Map<TopicIdPartition, PartitionProduceResponse> responseStatus = new 
HashMap<>();
 
         for (Map.Entry<TopicIdPartition, ProducePartitionStatus> entry : 
produceStatus.entrySet()) {
             responseStatus.put(entry.getKey(), 
entry.getValue().responseStatus());

Reply via email to