This is an automated email from the ASF dual-hosted git repository. FrankYang0529 pushed a commit to branch KAFKA-10730 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4274566a3cd1bd9f5351b09bba52f1604924068c Author: PoAn Yang <[email protected]> AuthorDate: Fri May 9 13:38:03 2025 +0800 KAFKA-10730: KafkaApis#handleProduceRequest should use auto-generated protocol Signed-off-by: PoAn Yang <[email protected]> --- .../kafka/common/requests/ProduceResponse.java | 65 ++------------- .../kafka/clients/producer/KafkaProducerTest.java | 15 +++- .../clients/producer/internals/SenderTest.java | 81 ++++++++++++++----- .../producer/internals/TransactionManagerTest.java | 15 +++- .../kafka/common/requests/ProduceResponseTest.java | 43 ++++++---- .../kafka/common/requests/RequestResponseTest.java | 38 ++++++--- .../transaction/TransactionStateManager.scala | 16 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 70 ++++++++++------ .../main/scala/kafka/server/ReplicaManager.scala | 49 ++++++----- .../kafka/server/LocalLeaderEndPointTest.scala | 38 ++++----- .../AbstractCoordinatorConcurrencyTest.scala | 18 ++++- .../transaction/TransactionStateManagerTest.scala | 20 +++-- .../scala/unit/kafka/server/KafkaApisTest.scala | 69 ++++++++++++---- .../server/ReplicaManagerConcurrencyTest.scala | 9 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 94 +++++++++++----------- .../jmh/producer/ProducerResponseBenchmark.java | 23 +++--- .../kafka/server/purgatory/DelayedProduce.java | 24 +++--- 17 files changed, 405 insertions(+), 282 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 9d7817e4902..d1e688c2b8d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -16,10 +16,10 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.message.ProduceResponseData; -import org.apache.kafka.common.message.ProduceResponseData.LeaderIdAndEpoch; +import org.apache.kafka.common.message.ProduceResponseData.NodeEndpoint; +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; @@ -29,7 +29,6 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; /** * This wrapper supports both v0 and v8 of ProduceResponse. @@ -65,42 +64,11 @@ public class ProduceResponse extends AbstractResponse { this.data = produceResponseData; } - /** - * Constructor for Version 0 - * This is deprecated in favor of using the ProduceResponseData constructor, KafkaApis should switch to that - * in KAFKA-10730 - * @param responses Produced data grouped by topic-partition - */ - @Deprecated - public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses) { - this(responses, DEFAULT_THROTTLE_TIME, List.of()); + public ProduceResponse(Map<TopicIdPartition, PartitionProduceResponse> responses, List<NodeEndpoint> nodeEndpoints, int throttleTimeMs) { + this(toData(responses, nodeEndpoints, throttleTimeMs)); } - /** - * This is deprecated in favor of using the ProduceResponseData constructor, KafkaApis should switch to that - * in KAFKA-10730 - * @param responses Produced data grouped by topic-partition - * @param throttleTimeMs Time in milliseconds the response was throttled - */ - @Deprecated - public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs) { - this(toData(responses, throttleTimeMs, List.of())); - } - - /** - * Constructor for the latest version - * This is deprecated in favor of using the ProduceResponseData constructor, KafkaApis should switch to that - * in KAFKA-10730 - * @param responses Produced data grouped by topic-partition - * @param throttleTimeMs Time in milliseconds the response was throttled - * @param nodeEndpoints List of node endpoints - */ - @Deprecated - public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) { - this(toData(responses, throttleTimeMs, nodeEndpoints)); - } - - private static ProduceResponseData toData(Map<TopicIdPartition, PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) { + private static ProduceResponseData toData(Map<TopicIdPartition, PartitionProduceResponse> responses, List<NodeEndpoint> nodeEndpoints, int throttleTimeMs) { ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs); responses.forEach((tp, response) -> { ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic(), tp.topicId()); @@ -108,28 +76,9 @@ public class ProduceResponse extends AbstractResponse { tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()).setTopicId(tp.topicId()); data.responses().add(tpr); } - tpr.partitionResponses() - .add(new ProduceResponseData.PartitionProduceResponse() - .setIndex(tp.partition()) - .setBaseOffset(response.baseOffset) - .setLogStartOffset(response.logStartOffset) - .setLogAppendTimeMs(response.logAppendTime) - .setErrorMessage(response.errorMessage) - .setErrorCode(response.error.code()) - .setCurrentLeader(response.currentLeader != null ? response.currentLeader : new LeaderIdAndEpoch()) - .setRecordErrors(response.recordErrors - .stream() - .map(e -> new ProduceResponseData.BatchIndexAndErrorMessage() - .setBatchIndex(e.batchIndex) - .setBatchIndexErrorMessage(e.message)) - .collect(Collectors.toList()))); + tpr.partitionResponses().add(response); }); - nodeEndpoints.forEach(endpoint -> data.nodeEndpoints() - .add(new ProduceResponseData.NodeEndpoint() - .setNodeId(endpoint.id()) - .setHost(endpoint.host()) - .setPort(endpoint.port()) - .setRack(endpoint.rack()))); + nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(endpoint)); return data; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 98ffd66270c..61c15453f3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -59,6 +59,7 @@ import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.InitProducerIdResponseData; +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetric; @@ -3005,11 +3006,17 @@ public class KafkaProducerTest { assertDoesNotThrow(() -> new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer()).close()); } - @SuppressWarnings("deprecation") private ProduceResponse produceResponse(TopicIdPartition topicIdPartition, long offset, Errors error, int throttleTimeMs, int logStartOffset) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); - Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(topicIdPartition, resp); - return new ProduceResponse(partResp, throttleTimeMs); + return new ProduceResponse(Map.of( + topicIdPartition, + new PartitionProduceResponse() + .setIndex(topicIdPartition.partition()) + .setBaseOffset(offset) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(logStartOffset) + .setErrorCode(error.code())), + List.of(), + throttleTimeMs); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 52dd2e08e58..59ce8da1c6a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -70,6 +70,7 @@ import org.apache.kafka.common.record.internal.MutableRecordBatch; import org.apache.kafka.common.record.internal.Record; import org.apache.kafka.common.record.internal.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; @@ -2448,9 +2449,17 @@ public class SenderTest { assertTrue(inflightBatch.isInflight(), "Batch should be marked inflight after being sent"); assertTrue(client.isReady(node, time.milliseconds()), "Client ready status should be true"); - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); - responseMap.put(tpId, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE)); - client.respond(new ProduceResponse(responseMap)); + Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> responseMap = new HashMap<>(); + responseMap.put( + tpId, + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tpId.partition()) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.MESSAGE_TOO_LARGE.code()) + ); + client.respond(new ProduceResponse(responseMap, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME)); sender.runOnce(); // split and reenqueue assertFalse(inflightBatch.isInflight(), "Batch should be marked as not inflight after being split and re-enqueued"); assertEquals(2, txnManager.sequenceNumber(tpId.topicPartition()), "The next sequence should be 2"); @@ -2467,9 +2476,17 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue(client.isReady(node, time.milliseconds()), "Client ready status should be true"); - responseMap.put(tpId, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); + responseMap.put( + tpId, + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tpId.partition()) + .setBaseOffset(0) + .setLogStartOffset(0) + .setLogAppendTimeMs(0) + .setErrorCode(Errors.NONE.code()) + ); client.respond(produceRequestMatcher(tpId.topicPartition(), producerIdAndEpoch, 0, txnManager.isTransactional()), - new ProduceResponse(responseMap)); + new ProduceResponse(responseMap, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME)); sender.runOnce(); // receive assertTrue(f1.isDone(), "The future should have been done."); @@ -2484,9 +2501,17 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue(client.isReady(node, time.milliseconds()), "Client ready status should be true"); - responseMap.put(tpId, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L)); + responseMap.put( + tpId, + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tpId.partition()) + .setBaseOffset(1) + .setLogStartOffset(0) + .setLogAppendTimeMs(0) + .setErrorCode(Errors.NONE.code()) + ); client.respond(produceRequestMatcher(tpId.topicPartition(), producerIdAndEpoch, 1, txnManager.isTransactional()), - new ProduceResponse(responseMap)); + new ProduceResponse(responseMap, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME)); sender.runOnce(); // receive assertTrue(f2.isDone(), "The future should have been done."); @@ -2539,9 +2564,16 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertEquals(1, sender.inFlightBatches(tp0).size(), "Expect one in-flight batch in accumulator"); - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); - responseMap.put(new TopicIdPartition(TOPIC_ID, tp0), new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); - client.respond(new ProduceResponse(responseMap)); + Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> responseMap = Map.of( + new TopicIdPartition(TOPIC_ID, tp0), + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp0.partition()) + .setBaseOffset(0) + .setLogStartOffset(0) + .setLogAppendTimeMs(0) + .setErrorCode(Errors.NONE.code()) + ); + client.respond(new ProduceResponse(responseMap, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME)); time.sleep(deliveryTimeoutMs); sender.runOnce(); // receive first response @@ -2719,9 +2751,16 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertEquals(1, sender.inFlightBatches(tp0).size(), "Expect one in-flight batch in accumulator"); - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); - responseMap.put(new TopicIdPartition(TOPIC_ID, tp0), new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); - client.respond(new ProduceResponse(responseMap)); + Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> responseMap = Map.of( + new TopicIdPartition(TOPIC_ID, tp0), + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp0.partition()) + .setBaseOffset(0) + .setLogStartOffset(0) + .setLogAppendTimeMs(0) + .setErrorCode(Errors.NONE.code()) + ); + client.respond(new ProduceResponse(responseMap, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME)); // Successfully expire both batches. time.sleep(deliveryTimeoutMs); @@ -3756,12 +3795,18 @@ public class SenderTest { null, MAX_BLOCK_TIMEOUT, time.milliseconds(), TestUtils.singletonCluster()).future; } - @SuppressWarnings("deprecation") private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, - RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage); - Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(new TopicIdPartition(TOPIC_ID, tp), resp); - return new ProduceResponse(partResp, throttleTimeMs); + return new ProduceResponse(Map.of( + new TopicIdPartition(TOPIC_ID, tp), + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setBaseOffset(offset) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(logStartOffset) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage)), + List.of(), + throttleTimeMs); } private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> responses) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a13e44a5e16..ccd9b713700 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.InitProducerIdResponseData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -4484,11 +4485,17 @@ public class TransactionManagerTest { return produceResponse(tp, offset, error, throttleTimeMs, 10); } - @SuppressWarnings("deprecation") private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, int logStartOffset) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); - Map<TopicIdPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(new TopicIdPartition(TOPIC_ID, tp), resp); - return new ProduceResponse(partResp, throttleTimeMs); + return new ProduceResponse(Map.of( + new TopicIdPartition(TOPIC_ID, tp), + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setBaseOffset(offset) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(logStartOffset) + .setErrorCode(error.code())), + List.of(), + throttleTimeMs); } private void initializeIdempotentProducerId(long producerId, short epoch) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java index 7a9a8e5ca25..b9f0682991d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java @@ -26,8 +26,6 @@ import org.apache.kafka.common.record.internal.RecordBatch; import org.junit.jupiter.api.Test; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,16 +36,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ProduceResponseTest { - @SuppressWarnings("deprecation") @Test public void produceResponseVersionTest() { - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); Uuid topicId = Uuid.fromString("5JkYABorYD4w0AQXe9TvBG"); TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, 0, "test"); - responseData.put(topicIdPartition, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100)); - ProduceResponse v0Response = new ProduceResponse(responseData); - ProduceResponse v1Response = new ProduceResponse(responseData, 10); - ProduceResponse v2Response = new ProduceResponse(responseData, 10); + Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> responseData = Map.of( + topicIdPartition, + new ProduceResponseData.PartitionProduceResponse() + .setIndex(0) + .setBaseOffset(10000) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(100) + .setErrorCode(Errors.NONE.code()) + ); + ProduceResponse v0Response = new ProduceResponse(responseData, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME); + ProduceResponse v1Response = new ProduceResponse(responseData, List.of(), 10); + ProduceResponse v2Response = new ProduceResponse(responseData, List.of(), 10); assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero"); assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10"); assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10"); @@ -71,17 +75,26 @@ public class ProduceResponseTest { @SuppressWarnings("deprecation") @Test public void produceResponseRecordErrorsTest() { - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); Uuid topicId = Uuid.fromString("4w0AQXe9TvBG5JkYABorYD"); TopicIdPartition tp = new TopicIdPartition(topicId, 0, "test"); - ProduceResponse.PartitionResponse partResponse = new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100, - Collections.singletonList(new ProduceResponse.RecordError(3, "Record error")), - "Produce failed"); - responseData.put(tp, partResponse); + Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> responseData = Map.of( + tp, + new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setBaseOffset(10000) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(100) + .setErrorCode(Errors.NONE.code()) + .setRecordErrors(List.of( + new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(3) + .setBatchIndexErrorMessage("Record error") + )) + .setErrorMessage("Produce failed") + ); for (short version : PRODUCE.allVersions()) { - ProduceResponse response = new ProduceResponse(responseData); + ProduceResponse response = new ProduceResponse(responseData, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME); ProduceResponse produceResponse = ProduceResponse.parse(response.serialize(version), version); ProduceResponseData.TopicProduceResponse topicProduceResponse = produceResponse.data().responses().iterator().next(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index a87a835fac0..e6c478910e4 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2543,21 +2543,35 @@ public class RequestResponseTest { @SuppressWarnings("deprecation") private ProduceResponse createProduceResponse() { - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); - Uuid topicId = Uuid.fromString("0AQorYD4we9TvBGX5JkYAB"); - responseData.put(new TopicIdPartition(topicId, 0, "test"), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100)); - return new ProduceResponse(responseData, 0); + return new ProduceResponse(Map.of( + new TopicIdPartition(Uuid.fromString("0AQorYD4we9TvBGX5JkYAB"), 0, "test"), + new ProduceResponseData.PartitionProduceResponse() + .setIndex(0) + .setBaseOffset(10000) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(100) + .setErrorCode(Errors.NONE.code())), + List.of(), + 0); } - @SuppressWarnings("deprecation") private ProduceResponse createProduceResponseWithErrorMessage() { - Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); - Uuid topicId = Uuid.fromString("0AQorYD4we9TvBGX5JkYAB"); - responseData.put(new TopicIdPartition(topicId, 0, "test"), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100, singletonList(new ProduceResponse.RecordError(0, "error message")), - "global error message")); - return new ProduceResponse(responseData, 0); + return new ProduceResponse(Map.of( + new TopicIdPartition(Uuid.fromString("0AQorYD4we9TvBGX5JkYAB"), 0, "test"), + new ProduceResponseData.PartitionProduceResponse() + .setIndex(0) + .setBaseOffset(10000) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(100) + .setErrorCode(Errors.NONE.code()) + .setRecordErrors(List.of( + new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(0) + .setBatchIndexErrorMessage("error message") + )) + .setErrorMessage("global error message")), + List.of(), + 0); } private SaslHandshakeRequest createSaslHandshakeRequest(short version) { diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index fae1c857654..ced4770f032 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -26,12 +26,12 @@ import kafka.utils.Logging import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.ListTransactionsResponseData +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.stats.{Avg, Max} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.record.internal.{FileRecords, MemoryRecords, MemoryRecordsBuilder, Record, SimpleRecord} -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition} @@ -259,7 +259,7 @@ class TransactionStateManager(brokerId: Int, expiredForPartition: Iterable[TransactionalIdCoordinatorEpochAndMetadata], tombstoneRecords: MemoryRecords ): Unit = { - def removeFromCacheCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { + def removeFromCacheCallback(responses: util.Map[TopicIdPartition, PartitionProduceResponse]): Unit = { responses.forEach { (topicPartition, response) => inReadLock[Exception](stateLock, () => { transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry => @@ -270,11 +270,11 @@ class TransactionStateManager(brokerId: Int, if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch && txnMetadata.pendingState.filter(s => s == TransactionState.DEAD).isPresent && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch - && response.error == Errors.NONE) { + && response.errorCode == Errors.NONE.code) { txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId) } else { warn(s"Failed to remove expired transactionalId: $transactionalId" + - s" from cache. Tombstone append error code: ${response.error}," + + s" from cache. Tombstone append error code: ${response.errorCode}," + s" pendingState: ${txnMetadata.pendingState}, producerEpoch: ${txnMetadata.producerEpoch}," + s" expected producerEpoch: ${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," + s" coordinatorEpoch: ${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " + @@ -671,7 +671,7 @@ class TransactionStateManager(brokerId: Int, val recordsPerPartition = Map(transactionStateTopicIdPartition -> records) // set the callback function to update transaction status in cache after log append completed - def updateCacheCallback(responseStatus: util.Map[TopicIdPartition, PartitionResponse]): Unit = { + def updateCacheCallback(responseStatus: util.Map[TopicIdPartition, PartitionProduceResponse]): Unit = { // the append response should only contain the topics partition if (responseStatus.size != 1 || !responseStatus.containsKey(transactionStateTopicIdPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" @@ -679,13 +679,13 @@ class TransactionStateManager(brokerId: Int, val status = responseStatus.get(transactionStateTopicIdPartition) - var responseError = if (status.error == Errors.NONE) { + var responseError = if (status.errorCode == Errors.NONE.code) { Errors.NONE } else { - debug(s"Appending $transactionalId's new metadata $newMetadata failed due to ${status.error.exceptionName}") + debug(s"Appending $transactionalId's new metadata $newMetadata failed due to ${Errors.forCode(status.errorCode).exceptionName}") // transform the log append error code to the corresponding coordinator error code - status.error match { + Errors.forCode(status.errorCode) match { case Errors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9e923563ad0..9cc68b73142 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection} +import org.apache.kafka.common.message.ProduceResponseData.{NodeEndpoint, PartitionProduceResponse} import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName @@ -48,7 +49,6 @@ import org.apache.kafka.common.record.internal._ import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType._ @@ -79,7 +79,6 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.stream.Collectors import java.util.{Collections, Optional} -import scala.annotation.nowarn import scala.collection.mutable.ArrayBuffer import scala.collection.{Map, Seq, Set, mutable} import scala.jdk.CollectionConverters._ @@ -406,9 +405,9 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() - val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() - val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionResponse]() + val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, PartitionProduceResponse]() + val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionProduceResponse]() + val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionProduceResponse]() val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]() val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)] @@ -423,7 +422,13 @@ class KafkaApis(val requestChannel: RequestChannel, val topicPartition = new TopicPartition(topicName, partition.index()) // To be compatible with the old version, only return UNKNOWN_TOPIC_ID if request version uses topicId, but the corresponding topic name can't be found. if (topicName.isEmpty && request.header.apiVersion > 12) - nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID) + nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> + new PartitionProduceResponse() + .setIndex(topicPartition.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) else topicIdToPartitionData += new TopicIdPartition(topicId, topicPartition) -> partition } @@ -437,44 +442,61 @@ class KafkaApis(val requestChannel: RequestChannel, // https://issues.apache.org/jira/browse/KAFKA-10698 val memoryRecords = partition.records.asInstanceOf[MemoryRecords] if (!authorizedTopics.contains(topicIdPartition.topic)) - unauthorizedTopicResponses += topicIdPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) + unauthorizedTopicResponses += topicIdPartition -> + new PartitionProduceResponse() + .setIndex(topicIdPartition.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topicIdPartition.topicPartition)) - nonExistingTopicResponses += topicIdPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) + nonExistingTopicResponses += topicIdPartition -> + new PartitionProduceResponse() + .setIndex(topicIdPartition.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) else try { ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords) authorizedRequestInfo += (topicIdPartition -> memoryRecords) } catch { case e: ApiException => - invalidRequestResponses += topicIdPartition -> new PartitionResponse(Errors.forException(e)) + invalidRequestResponses += topicIdPartition -> + new PartitionProduceResponse() + .setIndex(topicIdPartition.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.forException(e).code) } } - // the callback for sending a produce response - // The construction of ProduceResponse is able to accept auto-generated protocol data so - // KafkaApis#handleProduceRequest should apply auto-generated protocol to avoid extra conversion. - // https://issues.apache.org/jira/browse/KAFKA-10730 - @nowarn("cat=deprecation") - def sendResponseCallback(responseStatus: Map[TopicIdPartition, PartitionResponse]): Unit = { + def sendResponseCallback(responseStatus: Map[TopicIdPartition, PartitionProduceResponse]): Unit = { val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses var errorInResponse = false - val nodeEndpoints = new mutable.HashMap[Int, Node] + val nodeEndpoints = new mutable.HashMap[Int, NodeEndpoint] mergedResponseStatus.foreachEntry { (topicIdPartition, status) => - if (status.error != Errors.NONE) { + if (status.errorCode != Errors.NONE.code) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( request.header.correlationId, request.header.clientId, topicIdPartition, - status.error.exceptionName)) + Errors.forCode(status.errorCode).exceptionName)) if (request.header.apiVersion >= 10) { - status.error match { + Errors.forCode(status.errorCode) match { case Errors.NOT_LEADER_OR_FOLLOWER => val leaderNode = getCurrentLeader(topicIdPartition.topicPartition(), request.context.listenerName) leaderNode.node.foreach { node => - nodeEndpoints.put(node.id(), node) + nodeEndpoints.put(node.id, new NodeEndpoint() + .setNodeId(node.id) + .setHost(node.host) + .setPort(node.port) + .setRack(node.rack)) } status.currentLeader .setLeaderId(leaderNode.leaderId) @@ -511,21 +533,21 @@ class KafkaApis(val requestChannel: RequestChannel, // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => - topicPartition -> status.error.exceptionName + topicPartition -> Errors.forCode(status.errorCode).exceptionName }.mkString(", ") info( s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + s"from client id ${request.header.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) - requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts) + requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava, util.List.of, AbstractResponse.DEFAULT_THROTTLE_TIME).errorCounts) } else { // Note that although request throttling is exempt for acks == 0, the channel may be throttled due to // bandwidth quota violation. requestHelper.sendNoOpResponseExemptThrottle(request) } } else { - requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs, nodeEndpoints.values.toList.asJava), None) + requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, nodeEndpoints.values.toList.asJava, maxThrottleTimeMs), None) } } @@ -1839,7 +1861,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestLocal = requestLocal, responseCallback = errors => { errors.forEach { (topicIdPartition, partitionResponse) => - addResultAndMaybeComplete(topicIdPartition.topicPartition(), partitionResponse.error) + addResultAndMaybeComplete(topicIdPartition.topicPartition(), Errors.forCode(partitionResponse.errorCode)) } }, transactionVersion = markerTransactionVersion diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b109d8d411b..3aa997f97c7 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult} import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribeProducersResponseData} +import org.apache.kafka.common.message.ProduceResponseData.{BatchIndexAndErrorMessage, PartitionProduceResponse} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors @@ -40,7 +41,6 @@ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.replica._ import org.apache.kafka.common.requests.FetchRequest.PartitionData -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.internals.Exit import org.apache.kafka.common.utils.{Time, Utils} @@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Future, RejectedExecutionException, TimeUnit} import java.util.{Collections, Optional, OptionalInt, OptionalLong} import java.util.function.Consumer +import java.util.stream.Collectors import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.jdk.CollectionConverters._ import scala.jdk.FunctionConverters.enrichAsJavaConsumer @@ -640,7 +641,7 @@ class ReplicaManager(val config: KafkaConfig, internalTopicsAllowed: Boolean, origin: AppendOrigin, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], - responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, + responseCallback: util.Map[TopicIdPartition, PartitionProduceResponse] => Unit, recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty, @@ -699,7 +700,7 @@ class ReplicaManager(val config: KafkaConfig, internalTopicsAllowed: Boolean, transactionalId: String, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], - responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, + responseCallback: Map[TopicIdPartition, PartitionProduceResponse] => Unit, recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, transactionSupportedOperation: TransactionSupportedOperation): Unit = { @@ -760,7 +761,7 @@ class ReplicaManager(val config: KafkaConfig, val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus } - def newResponseCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { + def newResponseCallback(responses: util.Map[TopicIdPartition, PartitionProduceResponse]): Unit = { responseCallback(preAppendPartitionResponses ++ responses.asScala) } @@ -838,14 +839,19 @@ class ReplicaManager(val config: KafkaConfig, results.map { case (topicIdPartition, result) => topicIdPartition -> new ProducePartitionStatus( result.logAppendSummary.lastOffset + 1, // required offset - new PartitionResponse( - result.error, - result.logAppendSummary.firstOffset, - result.logAppendSummary.logAppendTime, - result.logAppendSummary.logStartOffset, - result.logAppendSummary.recordErrors, - result.errorMessage - ) + new PartitionProduceResponse() + .setIndex(topicIdPartition.partition) + .setErrorCode(result.error.code) + .setErrorMessage(result.errorMessage) + .setBaseOffset(result.logAppendSummary.firstOffset) + .setLogAppendTimeMs(result.logAppendSummary.logAppendTime) + .setLogStartOffset(result.logAppendSummary.logStartOffset) + .setRecordErrors(result.logAppendSummary.recordErrors + .stream() + .map(e => new BatchIndexAndErrorMessage() + .setBatchIndex(e.batchIndex) + .setBatchIndexErrorMessage(e.message)) + .collect(Collectors.toList())) ) } } @@ -881,7 +887,7 @@ class ReplicaManager(val config: KafkaConfig, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], initialAppendResults: Map[TopicIdPartition, LogAppendResult], initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus], - responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, + responseCallback: util.Map[TopicIdPartition, PartitionProduceResponse] => Unit, ): Unit = { if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { // Create delayed produce operation @@ -911,7 +917,7 @@ class ReplicaManager(val config: KafkaConfig, delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.asJava) } else { // we can respond immediately - val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] + val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionProduceResponse] initialProduceStatus.foreach { case (k, status) => produceResponseStatus.put(k, status.responseStatus) } responseCallback(produceResponseStatus) } @@ -919,16 +925,17 @@ class ReplicaManager(val config: KafkaConfig, private def sendInvalidRequiredAcksResponse( entries: Map[TopicIdPartition, MemoryRecords], - responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit): Unit = { + responseCallback: util.Map[TopicIdPartition, PartitionProduceResponse] => Unit): Unit = { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all - val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] + val responseStatus = new util.HashMap[TopicIdPartition, PartitionProduceResponse] entries.foreach { case(topicIdPartition, _) => - responseStatus.put(topicIdPartition, new PartitionResponse( - Errors.INVALID_REQUIRED_ACKS, - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, - RecordBatch.NO_TIMESTAMP, - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset) + responseStatus.put(topicIdPartition, new PartitionProduceResponse() + .setIndex(topicIdPartition.partition()) + .setBaseOffset(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset) + .setLogStartOffset(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.INVALID_REQUIRED_ACKS.code()) ) } responseCallback(responseStatus) diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala index 71f33100094..d4047f51e9e 100644 --- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala @@ -24,11 +24,11 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord} -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} import org.apache.kafka.metadata.KRaftMetadataCache @@ -133,23 +133,23 @@ class LocalLeaderEndPointTest extends Logging { @Test def testFetchLatestOffset(): Unit = { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, 0)) bumpLeaderEpoch() appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) assertEquals(new OffsetAndEpoch(6L, 1), endPoint.fetchLatestOffset(topicPartition, 7)) } @Test def testFetchEarliestOffset(): Unit = { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, 0)) bumpLeaderEpoch() appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ()) assertEquals(new OffsetAndEpoch(3L, 1), endPoint.fetchEarliestOffset(topicPartition, 7)) } @@ -157,12 +157,12 @@ class LocalLeaderEndPointTest extends Logging { @Test def testFetchEarliestLocalOffset(): Unit = { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, 0)) bumpLeaderEpoch() appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) replicaManager.logManager.getLog(topicPartition).ifPresent(_.updateLocalLogStartOffset(3)) assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, 7)) assertEquals(new OffsetAndEpoch(3L, 1), endPoint.fetchEarliestLocalOffset(topicPartition, 7)) @@ -171,7 +171,7 @@ class LocalLeaderEndPointTest extends Logging { @Test def testFetchEpochEndOffsets(): Unit = { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) var result = endPoint.fetchEpochEndOffsets(JMap.of( topicPartition, new OffsetForLeaderPartition() @@ -195,7 +195,7 @@ class LocalLeaderEndPointTest extends Logging { assertEquals(2, replicaManager.getPartitionOrException(topicPartition).getLeaderEpoch) appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) result = endPoint.fetchEpochEndOffsets(JMap.of( topicPartition, new OffsetForLeaderPartition() @@ -251,7 +251,7 @@ class LocalLeaderEndPointTest extends Logging { def testEarliestPendingUploadOffsetWhenNoSegmentsUploaded(): Unit = { // Append some records; no remote upload happened yet appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) val expected = endPoint.fetchEarliestOffset(topicPartition, 0) val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0) @@ -261,7 +261,7 @@ class LocalLeaderEndPointTest extends Logging { @Test def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStart(): Unit = { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) // Bump epoch and advance local log start offset without changing log start offset bumpLeaderEpoch() @@ -274,7 +274,7 @@ class LocalLeaderEndPointTest extends Logging { @Test def testEarliestPendingUploadOffsetWhenHighestRemoteOffsetKnown(): Unit = { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) // Highest remote is 1 => earliest pending should be max(1+1, logStart) val log = replicaManager.getPartitionOrException(topicPartition).localLogOrException @@ -291,7 +291,7 @@ class LocalLeaderEndPointTest extends Logging { def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStartWithKnownRemoteOffset(): Unit = { // Append records to create initial log state appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) val log = replicaManager.getPartitionOrException(topicPartition).localLogOrException @@ -315,7 +315,7 @@ class LocalLeaderEndPointTest extends Logging { // Append 12 records (offsets 0-11) for (_ <- 1 to 4) { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) } // Delete records to advance logStartOffset to 10 @@ -342,7 +342,7 @@ class LocalLeaderEndPointTest extends Logging { // Append 18 records (offsets 0-17) for (_ <- 1 to 6) { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) } // Delete records to advance logStartOffset to 10 @@ -369,7 +369,7 @@ class LocalLeaderEndPointTest extends Logging { // Append 12 records (offsets 0-11) for (_ <- 1 to 4) { appendRecords(replicaManager, topicIdPartition, records) - .onFire(response => assertEquals(Errors.NONE, response.error)) + .onFire(response => assertEquals(Errors.NONE.code, response.errorCode)) } // Delete records to advance both logStartOffset and localLogStartOffset to 10 @@ -430,9 +430,9 @@ class LocalLeaderEndPointTest extends Logging { partition: TopicIdPartition, records: MemoryRecords, origin: AppendOrigin = AppendOrigin.CLIENT, - requiredAcks: Short = -1): CallbackResult[PartitionResponse] = { - val result = new CallbackResult[PartitionResponse]() - def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]): Unit = { + requiredAcks: Short = -1): CallbackResult[PartitionProduceResponse] = { + val result = new CallbackResult[PartitionProduceResponse]() + def appendCallback(responses: JMap[TopicIdPartition, PartitionProduceResponse]): Unit = { val response = responses.get(partition) assertNotNull(response) result.fire(response) diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index abb3f6d4601..e5d1e5cb5d5 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -26,10 +26,10 @@ import kafka.cluster.Partition import kafka.server.QuotaFactory.QuotaManagers import kafka.server._ import kafka.utils._ +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch} -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.purgatory.DelayedProduce.ProducePartitionStatus @@ -216,7 +216,7 @@ object AbstractCoordinatorConcurrencyTest { internalTopicsAllowed: Boolean, origin: AppendOrigin, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], - responseCallback: java.util.Map[TopicIdPartition, PartitionResponse] => Unit, + responseCallback: java.util.Map[TopicIdPartition, PartitionProduceResponse] => Unit, processingStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.noCaching, verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty, @@ -226,7 +226,12 @@ object AbstractCoordinatorConcurrencyTest { return val produceStatus = entriesPerPartition.map { case (tp, _) => - (tp, new ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + (tp, new ProducePartitionStatus(0L, new PartitionProduceResponse() + .setIndex(tp.partition) + .setErrorCode(Errors.NONE.code) + .setBaseOffset(0) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(0))) }.asJava // It is safe to set the third parameter to null because it is only used in tryComplete(). @@ -243,7 +248,12 @@ object AbstractCoordinatorConcurrencyTest { override def onComplete(): Unit = { responseCallback(entriesPerPartition.map { case (tp, _) => - (tp, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)) + (tp, new PartitionProduceResponse() + .setIndex(tp.partition) + .setErrorCode(Errors.NONE.code) + .setBaseOffset(0) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(0)) }.asJava) } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 84c58f40c87..18e97da79c1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -26,11 +26,11 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.errors.InvalidRegularExpression import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics} import org.apache.kafka.common.protocol.{Errors, MessageUtil} import org.apache.kafka.common.record.internal._ import org.apache.kafka.common.record.TimestampType -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.MockTime import org.apache.kafka.coordinator.transaction.{TransactionLog, TransactionMetadata, TransactionState, TxnTransitMetadata} @@ -1106,7 +1106,7 @@ class TransactionStateManagerTest { capturedAppends: mutable.Map[TopicIdPartition, mutable.Buffer[MemoryRecords]] ): Unit = { val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]]) - val callbackCapture: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) + val callbackCapture: ArgumentCaptor[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit]) when(replicaManager.appendRecords( anyLong(), @@ -1129,7 +1129,12 @@ class TransactionStateManagerTest { batches += records - topicPartition -> new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L) + topicPartition -> new PartitionProduceResponse() + .setIndex(topicPartition.partition) + .setErrorCode(appendError.code) + .setBaseOffset(0) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(0) }.toMap.asJava )) } @@ -1261,7 +1266,7 @@ class TransactionStateManagerTest { private def prepareForTxnMessageAppend(error: Errors): Unit = { reset(replicaManager) - val capturedArgument: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) + val capturedArgument: ArgumentCaptor[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit]) when(replicaManager.appendRecords(anyLong(), anyShort(), internalTopicsAllowed = ArgumentMatchers.eq(true), @@ -1274,7 +1279,12 @@ class TransactionStateManagerTest { any() )).thenAnswer(_ => capturedArgument.getValue.apply( util.Map.of(new TopicIdPartition(transactionTopicId, partitionId, TRANSACTION_STATE_TOPIC_NAME), - new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + new PartitionProduceResponse() + .setIndex(partitionId) + .setErrorCode(error.code) + .setBaseOffset(0) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(0))) ) when(replicaManager.topicIdPartition(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 0))).thenReturn(new TopicIdPartition(transactionTopicId, 0, TRANSACTION_STATE_TOPIC_NAME)) when(replicaManager.topicIdPartition(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 1))).thenReturn(new TopicIdPartition(transactionTopicId, 1, TRANSACTION_STATE_TOPIC_NAME)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3fdcdaf3e49..4aa7b883050 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -56,6 +56,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.message.ShareFetchRequestData.{AcknowledgementBatch, ForgottenTopic} import org.apache.kafka.common.message.ShareFetchResponseData.{AcquiredRecords, PartitionData, ShareFetchableTopicResponse} import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord} @@ -69,7 +70,6 @@ import org.apache.kafka.common.record.internal._ import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} @@ -2448,7 +2448,7 @@ class KafkaApisTest extends Logging { reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) - val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] => Unit]) val produceData = new ProduceRequestData.TopicProduceData() .setPartitionData(util.List.of( @@ -2480,7 +2480,12 @@ class KafkaApisTest extends Logging { any(), any(), any() - )).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH)))) + )).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionProduceResponse() + .setIndex(tp.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code)))) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) @@ -2839,7 +2844,7 @@ class KafkaApisTest extends Logging { reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) - val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] => Unit]) val tp = new TopicIdPartition(topicId, 0, topic) val partition = mock(classOf[Partition]) @@ -2874,7 +2879,12 @@ class KafkaApisTest extends Logging { any(), any(), any()) - ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)))) + ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionProduceResponse() + .setIndex(tp.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)))) when(replicaManager.getPartitionOrError(tp.topicPartition())).thenAnswer(_ => Right(partition)) when(partition.leaderReplicaIdOpt).thenAnswer(_ => Some(newLeaderId)) @@ -2913,7 +2923,7 @@ class KafkaApisTest extends Logging { reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) - val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] => Unit]) val tp = new TopicIdPartition(topicId, 0, topic) @@ -2945,7 +2955,12 @@ class KafkaApisTest extends Logging { any(), any(), any()) - ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)))) + ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionProduceResponse() + .setIndex(tp.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)))) when(replicaManager.getPartitionOrError(tp.topicPartition())).thenAnswer(_ => Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)) @@ -2983,7 +2998,7 @@ class KafkaApisTest extends Logging { reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) - val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionProduceResponse] => Unit]) val tp = new TopicIdPartition(topicId, 0, topic) @@ -3017,7 +3032,12 @@ class KafkaApisTest extends Logging { any(), any(), any()) - ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)))) + ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionProduceResponse() + .setIndex(tp.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)))) when(replicaManager.getPartitionOrError(tp.topicPartition)).thenAnswer(_ => Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)) @@ -3223,7 +3243,7 @@ class KafkaApisTest extends Logging { val expectedErrors = util.Map.of(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2, Errors.NONE) val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) - val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit] = ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit]) when(replicaManager.onlinePartition(tp1)) .thenReturn(None) @@ -3241,7 +3261,12 @@ class KafkaApisTest extends Logging { ArgumentMatchers.eq(requestLocal), any(), any() - )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE)))) + )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new TopicIdPartition(topicId,tp2), new PartitionProduceResponse() + .setIndex(tp2.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.NONE.code)))) kafkaApis = createKafkaApis() kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal) verify(requestChannel).sendResponse( @@ -3364,8 +3389,8 @@ class KafkaApisTest extends Logging { val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]]) - val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = - ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit] = + ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit]) when(replicaManager.appendRecords( ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong), @@ -3381,7 +3406,12 @@ class KafkaApisTest extends Logging { )).thenAnswer { _ => responseCallback.getValue.apply( entriesPerPartition.getValue.keySet.map { tp => - tp -> new PartitionResponse(Errors.NONE) + tp -> new PartitionProduceResponse() + .setIndex(tp.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.NONE.code) }.toMap.asJava ) } @@ -3535,8 +3565,8 @@ class KafkaApisTest extends Logging { // Set up appendRecords to simulate epoch validation failure val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]]) - val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionResponse] => Unit] = - ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse] => Unit]) + val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit] = + ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionProduceResponse] => Unit]) when(replicaManager.appendRecords( ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong), @@ -3553,7 +3583,12 @@ class KafkaApisTest extends Logging { // Simulate epoch validation failure by calling callback with INVALID_PRODUCER_EPOCH error val topicIdPartition = new TopicIdPartition(topicId, topicPartition) responseCallback.getValue.apply( - util.Map.of(topicIdPartition, new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH)) + util.Map.of(topicIdPartition, new PartitionProduceResponse() + .setIndex(topicIdPartition.partition) + .setBaseOffset(ProduceResponse.INVALID_OFFSET) + .setLogStartOffset(ProduceResponse.INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code)) ) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index 0ed1a999d71..e1aa429ddb5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -26,12 +26,13 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.{Logging, TestUtils} import org.apache.kafka.common +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.internal.SimpleRecord import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata -import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse} +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid} @@ -295,10 +296,10 @@ class ReplicaManagerConcurrencyTest extends Logging { new SimpleRecord(s"$clientId-${sequence + i}".getBytes) } - val future = new CompletableFuture[ProduceResponse.PartitionResponse]() + val future = new CompletableFuture[PartitionProduceResponse]() val topicIdPartition: common.TopicIdPartition = replicaManager.topicIdPartition(topicPartition) - def produceCallback(results: util.Map[common.TopicIdPartition, ProduceResponse.PartitionResponse]): Unit = { + def produceCallback(results: util.Map[common.TopicIdPartition, PartitionProduceResponse]): Unit = { try { assertEquals(1, results.size) @@ -307,7 +308,7 @@ class ReplicaManagerConcurrencyTest extends Logging { val result = entry.getValue assertEquals(topicIdPartition, topicPartition) - assertEquals(Errors.NONE, result.error) + assertEquals(Errors.NONE.code, result.errorCode) future.complete(result) } catch { case e: Throwable => future.completeExceptionally(e) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9639d1e46aa..909b8dad4b0 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InvalidPidMappingException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.{DeleteRecordsResponseData, FetchResponseData, ShareFetchResponseData} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Monitorable @@ -47,7 +48,6 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView} import org.apache.kafka.common.requests.FetchRequest.PartitionData -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} @@ -263,8 +263,8 @@ class ReplicaManagerTest { logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), alterPartitionManager = alterPartitionManager) try { - def callback(responseStatus: util.Map[TopicIdPartition, PartitionResponse]): Unit = { - assert(responseStatus.values().iterator().next().error == Errors.INVALID_REQUIRED_ACKS) + def callback(responseStatus: util.Map[TopicIdPartition, PartitionProduceResponse]): Unit = { + assert(responseStatus.values().iterator().next().errorCode == Errors.INVALID_REQUIRED_ACKS.code) } rm.appendRecords( timeout = 0, @@ -436,7 +436,7 @@ class ReplicaManagerTest { val records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("first message".getBytes())) val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, response.errorCode) } // Make this replica the follower @@ -571,7 +571,7 @@ class ReplicaManagerTest { val records = MemoryRecords.withIdempotentRecords(Compression.NONE, producerId, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -583,7 +583,7 @@ class ReplicaManagerTest { val record = MemoryRecords.withIdempotentRecords(Compression.NONE, producerId, epoch, outOfRangeSequence, new SimpleRecord(s"message: $outOfRangeSequence".getBytes)) appendRecords(replicaManager, new TopicPartition(topic, 0), record).onFire { response => - assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, response.error) + assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.code, response.errorCode) assertEquals(0, response.logStartOffset) } @@ -628,7 +628,7 @@ class ReplicaManagerTest { val records = MemoryRecords.withIdempotentRecords(Compression.NONE, pid, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) appendRecords(replicaManager, new TopicPartition(topic, partition), records).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -705,7 +705,7 @@ class ReplicaManagerTest { val records = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, epoch, sequence, new SimpleRecord(time.milliseconds(), s"message $sequence".getBytes)) handleProduceAppend(replicaManager, new TopicPartition(topic, 0), records, transactionalId = transactionalId).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } assertLateTransactionCount(Some(0)) @@ -720,7 +720,7 @@ class ReplicaManagerTest { val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, abortTxnMarker) appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, origin = AppendOrigin.COORDINATOR, transactionVersion = TransactionVersion.TV_0.featureLevel()).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } assertLateTransactionCount(Some(0)) } finally { @@ -760,7 +760,7 @@ class ReplicaManagerTest { val records = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) handleProduceAppend(replicaManager, new TopicPartition(topic, 0), records, transactionalId = transactionalId).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -805,7 +805,7 @@ class ReplicaManagerTest { val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch, origin = AppendOrigin.COORDINATOR, transactionVersion = TransactionVersion.TV_0.featureLevel()) - .onFire { response => assertEquals(Errors.NONE, response.error) } + .onFire { response => assertEquals(Errors.NONE.code, response.errorCode) } // the LSO has advanced, but the appended commit marker has not been replicated, so // none of the data from the transaction should be visible yet @@ -871,7 +871,7 @@ class ReplicaManagerTest { val records = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) handleProduceAppend(replicaManager, new TopicPartition(topic, 0), records, transactionalId = transactionalId).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -880,7 +880,7 @@ class ReplicaManagerTest { val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, origin = AppendOrigin.COORDINATOR, transactionVersion = TransactionVersion.TV_0.featureLevel()) - .onFire { response => assertEquals(Errors.NONE, response.error) } + .onFire { response => assertEquals(Errors.NONE.code, response.errorCode) } // fetch as follower to advance the high watermark fetchPartitionAsFollower( @@ -942,7 +942,7 @@ class ReplicaManagerTest { for (i <- 1 to 2) { val records = TestUtils.singletonRecords(s"message $i".getBytes) appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -999,7 +999,7 @@ class ReplicaManagerTest { // Leader appends some data for (i <- 1 to 5) { appendRecords(replicaManager, tp, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -1148,10 +1148,10 @@ class ReplicaManagerTest { // Append a couple of messages. for (i <- 1 to 2) { appendRecords(replicaManager, tp0, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } appendRecords(replicaManager, tp1, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } @@ -1419,7 +1419,7 @@ class ReplicaManagerTest { replicaManager.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply())) appendRecords(replicaManager, tp0, TestUtils.singletonRecords(s"message".getBytes)).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } // Fetch as follower to initialise the log end offset of the replica fetchPartitionAsFollower( @@ -1897,7 +1897,7 @@ class ReplicaManagerTest { // Confirm we did not write to the log and instead returned error. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE)) - assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error) + assertEquals(Errors.INVALID_TXN_STATE.code, result.assertFired.errorCode) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) // This time verification is successful. @@ -1970,7 +1970,7 @@ class ReplicaManagerTest { if (error != Errors.CONCURRENT_TRANSACTIONS) { // NOT_COORDINATOR is converted to NOT_ENOUGH_REPLICAS - assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, result.assertFired.errorCode) } else { // The append should not finish with error, it should retry later. assertFalse(result.hasFired) @@ -2033,7 +2033,7 @@ class ReplicaManagerTest { // Confirm we did not write to the log and instead returned error. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue callback.complete(util.Map.of(tp0, Errors.INVALID_PRODUCER_ID_MAPPING)) - assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, result.assertFired.error) + assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING.code, result.assertFired.errorCode) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) // Try to append a higher sequence (1) after the first one failed with a retriable error. @@ -2056,7 +2056,7 @@ class ReplicaManagerTest { val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue callback2.complete(util.Map.of()) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) - assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, result2.assertFired.error) + assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.code, result2.assertFired.errorCode) } finally { replicaManager.shutdown(checkpointHW = false) } @@ -2126,7 +2126,7 @@ class ReplicaManagerTest { verifyNoMoreInteractions(addPartitionsToTxnManager) // broker returns the fencing error - assertEquals(Errors.INVALID_PRODUCER_EPOCH, result2.assertFired.error) + assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, result2.assertFired.errorCode) } finally { replicaManager.shutdown(checkpointHW = false) } @@ -2154,7 +2154,7 @@ class ReplicaManagerTest { handleProduceAppendToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> transactionalRecords), transactionalId).onFire { responses => responses.foreach { - entry => assertEquals(Errors.NONE, entry._2.error) + entry => assertEquals(Errors.NONE.code, entry._2.errorCode) } } } finally { @@ -2213,7 +2213,7 @@ class ReplicaManagerTest { // We should not add these partitions to the manager to verify, but instead throw an error. handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId).onFire { response => - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, response.errorCode) } verify(addPartitionsToTxnManager, times(0)).addOrVerifyTransaction(any(), any(), any(), any(), any(), any()) } finally { @@ -2245,7 +2245,7 @@ class ReplicaManagerTest { val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) handleProduceAppend(replicaManager, tp, transactionalRecords, transactionalId = transactionalId).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId)) @@ -2317,7 +2317,7 @@ class ReplicaManagerTest { // Confirm we did not write to the log and instead returned error. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue callback.complete(util.Map.of(tp0, Errors.INVALID_TXN_STATE)) - assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error) + assertEquals(Errors.INVALID_TXN_STATE.code, result.assertFired.errorCode) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) // This time we do not verify @@ -2372,7 +2372,7 @@ class ReplicaManagerTest { // Confirm we did not write to the log and instead returned the converted error with the correct error message. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue callback.complete(util.Map.of(tp0, error)) - assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, result.assertFired.errorCode) assertEquals(expectedMessage, result.assertFired.errorMessage) } finally { replicaManager.shutdown(checkpointHW = false) @@ -2422,7 +2422,7 @@ class ReplicaManagerTest { // Confirm we did not write to the log and instead returned the converted error with the correct error message. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue callback.complete(util.Map.of(tp0, error)) - assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, result.assertFired.errorCode) assertEquals(expectedMessage, result.assertFired.errorMessage) } finally { replicaManager.shutdown(checkpointHW = false) @@ -2459,9 +2459,9 @@ class ReplicaManagerTest { replicaManager: ReplicaManager, topicPartition: TopicIdPartition, numOfRecords: Int - ): AtomicReference[PartitionResponse] = { - val produceResult = new AtomicReference[PartitionResponse]() - def callback(response: util.Map[TopicIdPartition, PartitionResponse]): Unit = { + ): AtomicReference[PartitionProduceResponse] = { + val produceResult = new AtomicReference[PartitionProduceResponse]() + def callback(response: util.Map[TopicIdPartition, PartitionProduceResponse]): Unit = { produceResult.set(response.get(topicPartition)) } @@ -2705,10 +2705,10 @@ class ReplicaManagerTest { records: MemoryRecords, origin: AppendOrigin = AppendOrigin.CLIENT, requiredAcks: Short = -1, - transactionVersion: Short = TransactionVersion.TV_UNKNOWN): CallbackResult[PartitionResponse] = { - val result = new CallbackResult[PartitionResponse]() + transactionVersion: Short = TransactionVersion.TV_UNKNOWN): CallbackResult[PartitionProduceResponse] = { + val result = new CallbackResult[PartitionProduceResponse]() val topicIdPartition = new TopicIdPartition(topicId, partition) - def appendCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { + def appendCallback(responses: util.Map[TopicIdPartition, PartitionProduceResponse]): Unit = { val response = responses.get(topicIdPartition) assertNotNull(response) result.fire(response) @@ -2732,9 +2732,9 @@ class ReplicaManagerTest { transactionalId: String, requiredAcks: Short = -1, transactionSupportedOperation: TransactionSupportedOperation = GENERIC_ERROR_SUPPORTED - ): CallbackResult[Map[TopicIdPartition, PartitionResponse]] = { - val result = new CallbackResult[Map[TopicIdPartition, PartitionResponse]]() - def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): Unit = { + ): CallbackResult[Map[TopicIdPartition, PartitionProduceResponse]] = { + val result = new CallbackResult[Map[TopicIdPartition, PartitionProduceResponse]]() + def appendCallback(responses: Map[TopicIdPartition, PartitionProduceResponse]): Unit = { responses.foreach( response => assertTrue(responses.get(response._1).isDefined)) result.fire(responses) } @@ -2759,11 +2759,11 @@ class ReplicaManagerTest { requiredAcks: Short = -1, transactionalId: String, transactionSupportedOperation: TransactionSupportedOperation = GENERIC_ERROR_SUPPORTED - ): CallbackResult[PartitionResponse] = { - val result = new CallbackResult[PartitionResponse]() + ): CallbackResult[PartitionProduceResponse] = { + val result = new CallbackResult[PartitionProduceResponse]() val topicIdPartition = new TopicIdPartition(topicIds.get(partition.topic()).getOrElse(Uuid.ZERO_UUID), partition) - def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): Unit = { + def appendCallback(responses: Map[TopicIdPartition, PartitionProduceResponse]): Unit = { val response = responses.get(topicIdPartition) assertTrue(response.isDefined) result.fire(response.get) @@ -4521,7 +4521,7 @@ class ReplicaManagerTest { new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, Optional.empty()), replicaId = otherId ) - assertEquals(Errors.NONE, leaderResponse.get.error) + assertEquals(Errors.NONE.code, leaderResponse.get.errorCode) // Change the local replica to follower val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false) @@ -4530,7 +4530,7 @@ class ReplicaManagerTest { // Append on a follower should fail val followerResponse = sendProducerAppend(replicaManager, topicIdPartition, numOfRecords) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, followerResponse.get.errorCode) // Check the state of that partition and fetcher val followerPartition = getOnlinePartition(replicaManager.getPartition(topicPartition)) @@ -4582,7 +4582,7 @@ class ReplicaManagerTest { val followerResponse = sendProducerAppend(replicaManager, new TopicIdPartition(followerMetadataImage.topics().topicsByName().get("foo").id, topicPartition), numOfRecords) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, followerResponse.get.errorCode) // Change the local replica to leader val leaderTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, true) @@ -4598,7 +4598,7 @@ class ReplicaManagerTest { new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, Optional.empty()), replicaId = otherId ) - assertEquals(Errors.NONE, leaderResponse.get.error) + assertEquals(Errors.NONE.code, leaderResponse.get.errorCode) val leaderPartition = getOnlinePartition(replicaManager.getPartition(topicPartition)) assertTrue(leaderPartition.isLeader) @@ -4898,7 +4898,7 @@ class ReplicaManagerTest { } // Check that the produce failed because it changed to follower before replicating - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, leaderResponse.get.errorCode) } finally { replicaManager.shutdown(checkpointHW = false) } @@ -5397,7 +5397,7 @@ class ReplicaManagerTest { // Leader appends some data for (i <- 1 to 5) { appendRecords(replicaManager, tp, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => - assertEquals(Errors.NONE, response.error) + assertEquals(Errors.NONE.code, response.errorCode) } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java index f8aa623cf4b..9d38d6a53bc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java @@ -19,6 +19,7 @@ package org.apache.kafka.jmh.producer; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ProduceResponse; @@ -34,6 +35,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; import java.util.AbstractMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -50,17 +52,18 @@ public class ProducerResponseBenchmark { private static final int NUMBER_OF_RECORDS = 3; private static final Uuid TOPIC_ID = Uuid.randomUuid(); private static final String TOPIC_NAME = "tp"; - private static final Map<TopicIdPartition, ProduceResponse.PartitionResponse> PARTITION_RESPONSE_MAP = IntStream.range(0, NUMBER_OF_PARTITIONS) + private static final Map<TopicIdPartition, ProduceResponseData.PartitionProduceResponse> PARTITION_RESPONSE_MAP = IntStream.range(0, NUMBER_OF_PARTITIONS) .mapToObj(partitionIndex -> new AbstractMap.SimpleEntry<>( new TopicIdPartition(TOPIC_ID, partitionIndex, TOPIC_NAME), - new ProduceResponse.PartitionResponse( - Errors.NONE, - 0, - 0, - 0, - IntStream.range(0, NUMBER_OF_RECORDS) - .mapToObj(ProduceResponse.RecordError::new) - .toList()) + new ProduceResponseData.PartitionProduceResponse() + .setIndex(partitionIndex) + .setBaseOffset(0) + .setLogAppendTimeMs(0) + .setLogStartOffset(0) + .setErrorCode(Errors.NONE.code()) + .setRecordErrors(IntStream.range(0, NUMBER_OF_RECORDS) + .mapToObj(i -> new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(i)) + .collect(Collectors.toList())) )) .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); @@ -70,7 +73,7 @@ public class ProducerResponseBenchmark { */ @SuppressWarnings("deprecation") private static ProduceResponse response() { - return new ProduceResponse(PARTITION_RESPONSE_MAP); + return new ProduceResponse(PARTITION_RESPONSE_MAP, List.of(), AbstractResponse.DEFAULT_THROTTLE_TIME); } private static final ProduceResponse RESPONSE = response(); diff --git a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java index b7ce44686d1..6d776d3fcfb 100644 --- a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java +++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java @@ -18,9 +18,9 @@ package org.apache.kafka.server.purgatory; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ProduceResponseData.PartitionProduceResponse; import org.apache.kafka.common.metrics.internals.MetricsUtils; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import com.yammer.metrics.core.Meter; @@ -50,16 +50,16 @@ public class DelayedProduce extends DelayedOperation { public static final class ProducePartitionStatus { private final long requiredOffset; - private final PartitionResponse responseStatus; + private final PartitionProduceResponse responseStatus; private volatile boolean acksPending; - public ProducePartitionStatus(long requiredOffset, PartitionResponse responseStatus) { + public ProducePartitionStatus(long requiredOffset, PartitionProduceResponse responseStatus) { this.requiredOffset = requiredOffset; this.responseStatus = responseStatus; } - public PartitionResponse responseStatus() { + public PartitionProduceResponse responseStatus() { return responseStatus; } @@ -72,8 +72,8 @@ public class DelayedProduce extends DelayedOperation { return String.format( "[acksPending: %s, error: %s, startOffset: %s, requiredOffset: %d]", acksPending, - responseStatus.error.code(), - responseStatus.baseOffset, + responseStatus.errorCode(), + responseStatus.baseOffset(), requiredOffset ); } @@ -95,12 +95,12 @@ public class DelayedProduce extends DelayedOperation { private final Map<TopicIdPartition, ProducePartitionStatus> produceStatus; private final PartitionStatusValidator statusValidator; - private final Consumer<Map<TopicIdPartition, PartitionResponse>> responseCallback; + private final Consumer<Map<TopicIdPartition, PartitionProduceResponse>> responseCallback; public DelayedProduce(long delayMs, Map<TopicIdPartition, ProducePartitionStatus> produceStatus, PartitionStatusValidator statusValidator, - Consumer<Map<TopicIdPartition, PartitionResponse>> responseCallback) { + Consumer<Map<TopicIdPartition, PartitionProduceResponse>> responseCallback) { super(delayMs); this.produceStatus = produceStatus; @@ -109,10 +109,10 @@ public class DelayedProduce extends DelayedOperation { // first update the acks pending variable according to the error code produceStatus.forEach((topicPartition, status) -> { - if (status.responseStatus.error == Errors.NONE) { + if (status.responseStatus.errorCode() == Errors.NONE.code()) { // Timeout error state will be cleared when required acks are received status.acksPending = true; - status.responseStatus.error = Errors.REQUEST_TIMED_OUT; + status.responseStatus.setErrorCode(Errors.REQUEST_TIMED_OUT.code()); } else { status.acksPending = false; } @@ -151,7 +151,7 @@ public class DelayedProduce extends DelayedOperation { Errors errors = result.error; if (errors != Errors.NONE || result.hasEnough()) { status.setAcksPending(false); - status.responseStatus.error = errors; + status.responseStatus.setErrorCode(errors.code()); } } }); @@ -186,7 +186,7 @@ public class DelayedProduce extends DelayedOperation { */ @Override public void onComplete() { - Map<TopicIdPartition, PartitionResponse> responseStatus = new HashMap<>(); + Map<TopicIdPartition, PartitionProduceResponse> responseStatus = new HashMap<>(); for (Map.Entry<TopicIdPartition, ProducePartitionStatus> entry : produceStatus.entrySet()) { responseStatus.put(entry.getKey(), entry.getValue().responseStatus());
