This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e0596d9848 MINOR: Migrate ProduceRequestTest to server module (#22124)
2e0596d9848 is described below

commit 2e0596d9848e743039ae0cafd1a3cd01d79a4666
Author: Yunchi Pang <[email protected]>
AuthorDate: Fri May 1 17:47:34 2026 -0400

    MINOR: Migrate ProduceRequestTest to server module (#22124)
    
    Migrate `ProduceRequestTest` to server module and rewrite it in java.
    
    Reviewers: Ken Huang <[email protected]>, Mickael Maison
     <[email protected]>, Chia-Ping Tsai <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../unit/kafka/server/ProduceRequestTest.scala     | 301 -------------------
 .../kafka/server/requests/ProduceRequestTest.java  | 322 +++++++++++++++++++++
 2 files changed, 322 insertions(+), 301 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
deleted file mode 100644
index 5354c3e1d68..00000000000
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.nio.ByteBuffer
-import java.util.{Collections, Properties}
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, TopicDescription}
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.message.ProduceRequestData
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.util.concurrent.TimeUnit
-import scala.jdk.CollectionConverters._
-
-/**
-  * Subclasses of `BaseProduceSendRequestTest` exercise the producer and 
produce request/response. This class
-  * complements those classes with tests that require lower-level access to 
the protocol.
-  */
-class ProduceRequestTest extends BaseRequestTest {
-
-  val metricsKeySet = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
-
-  @Test
-  def testSimpleProduceRequest(): Unit = {
-    val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
-
-    def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit 
= {
-      val topicId = getTopicIds().get("topic").get
-      val produceRequest = ProduceRequest.builder(new ProduceRequestData()
-        .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
-          new ProduceRequestData.TopicProduceData()
-            .setTopicId(topicId)
-            .setPartitionData(Collections.singletonList(new 
ProduceRequestData.PartitionProduceData()
-              .setIndex(partition)
-              .setRecords(memoryRecords)))).iterator))
-        .setAcks((-1).toShort)
-        .setTimeoutMs(3000)
-        .setTransactionalId(null)).build()
-      assertEquals(ApiKeys.PRODUCE.latestVersion(), produceRequest.version())
-      val produceResponse = sendProduceRequest(leader, produceRequest)
-      assertEquals(1, produceResponse.data.responses.size)
-      val topicProduceResponse = produceResponse.data.responses.asScala.head
-      assertEquals(1, topicProduceResponse.partitionResponses.size)   
-      val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
-      assertEquals(topicId, topicProduceResponse.topicId())
-      assertEquals(partition, partitionProduceResponse.index())
-      assertEquals(Errors.NONE, 
Errors.forCode(partitionProduceResponse.errorCode))
-      assertEquals(expectedOffset, partitionProduceResponse.baseOffset)
-      assertEquals(-1, partitionProduceResponse.logAppendTimeMs)
-      assertTrue(partitionProduceResponse.recordErrors.isEmpty)
-    }
-
-    sendAndCheck(MemoryRecords.withRecords(Compression.NONE,
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, 
"value".getBytes)), 0)
-
-    sendAndCheck(MemoryRecords.withRecords(Compression.gzip().build(),
-      new SimpleRecord(System.currentTimeMillis(), "key1".getBytes, 
"value1".getBytes),
-      new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, 
"value2".getBytes)), 1)
-  }
-
-  private def getPartitionToLeader(
-    admin: Admin,
-    topic: String
-  ): Map[Int, Int] = {
-    var topicDescription: TopicDescription = null
-    TestUtils.waitUntilTrue(() => {
-      val topicMap = admin.
-        describeTopics(java.util.Arrays.asList(topic)).
-          allTopicNames().get(10, TimeUnit.MINUTES)
-      topicDescription = topicMap.get(topic)
-      topicDescription != null
-    }, "Timed out waiting to describe topic " + topic)
-    topicDescription.partitions().asScala.map(p => {
-      p.partition() -> p.leader().id()
-    }).toMap
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("timestampConfigProvider"))
-  def testProduceWithInvalidTimestamp(messageTimeStampConfig: String, 
recordTimestamp: Long): Unit = {
-    val topic = "topic"
-    val partition = 0
-    val topicConfig = new Properties
-    topicConfig.setProperty(messageTimeStampConfig, "1000")
-    val admin = createAdminClient()
-    TestUtils.createTopicWithAdmin(
-      admin = admin,
-      topic = topic,
-      brokers = brokers,
-      controllers = controllerServers,
-      numPartitions = 1,
-      replicationFactor = 1,
-      topicConfig = topicConfig
-    )
-    val partitionToLeader = getPartitionToLeader(admin, topic)
-    val leader = partitionToLeader(partition)
-    val topicDescription = TestUtils.describeTopic(createAdminClient(), topic)
-
-    def createRecords(magicValue: Byte, timestamp: Long, codec: Compression): 
MemoryRecords = {
-      val buf = ByteBuffer.allocate(512)
-      val builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, 0L)
-      builder.appendWithOffset(0, timestamp, null, "hello".getBytes)
-      builder.appendWithOffset(1, timestamp, null, "there".getBytes)
-      builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes)
-      builder.build()
-    }
-
-    val records = createRecords(RecordBatch.MAGIC_VALUE_V2, recordTimestamp, 
Compression.gzip().build())
-    val topicPartition = new TopicIdPartition(topicDescription.topicId(), 
partition, "topic")
-    val produceResponse = sendProduceRequest(leader, 
ProduceRequest.builder(new ProduceRequestData()
-      .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
-        new ProduceRequestData.TopicProduceData()
-          .setTopicId(topicPartition.topicId())
-          .setPartitionData(Collections.singletonList(new 
ProduceRequestData.PartitionProduceData()
-            .setIndex(topicPartition.partition())
-            .setRecords(records)))).iterator))
-      .setAcks((-1).toShort)
-      .setTimeoutMs(3000)
-      .setTransactionalId(null)).build())
-
-    assertEquals(1, produceResponse.data.responses.size)
-    val topicProduceResponse = produceResponse.data.responses.asScala.head
-    assertEquals(1, topicProduceResponse.partitionResponses.size)   
-    val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
-    val tp = new TopicIdPartition(topicProduceResponse.topicId(),
-      partitionProduceResponse.index,
-      getTopicNames().get(topicProduceResponse.topicId()).getOrElse(""))
-    assertEquals(topicPartition, tp)
-    assertEquals(Errors.INVALID_TIMESTAMP, 
Errors.forCode(partitionProduceResponse.errorCode))
-    // there are 3 records with InvalidTimestampException created from inner 
function createRecords
-    assertEquals(3, partitionProduceResponse.recordErrors.size)
-    val recordErrors = partitionProduceResponse.recordErrors.asScala
-    recordErrors.indices.foreach(i => assertEquals(i, 
recordErrors(i).batchIndex))
-    recordErrors.foreach(recordError => 
assertNotNull(recordError.batchIndexErrorMessage))
-    assertEquals("One or more records have been rejected due to invalid 
timestamp", partitionProduceResponse.errorMessage)
-  }
-
-  @Test
-  def testProduceToNonReplica(): Unit = {
-    val topic = "topic"
-    val partition = 0
-
-    // Create a single-partition topic and find a broker which is not the 
leader
-    val admin = createAdminClient()
-    TestUtils.createTopicWithAdmin(
-      admin = admin,
-      topic = topic,
-      brokers = brokers,
-      controllers = controllerServers
-    )
-    val partitionToLeader = getPartitionToLeader(admin, topic)
-    val leader = partitionToLeader(partition)
-    val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
-    assertTrue(nonReplicaOpt.isDefined)
-    val nonReplicaId =  nonReplicaOpt.get.config.brokerId
-
-    // Send the produce request to the non-replica
-    val records = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("key".getBytes, "value".getBytes))
-    val produceRequest = ProduceRequest.builder(new ProduceRequestData()
-      .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
-        new ProduceRequestData.TopicProduceData()
-          .setTopicId(getTopicIds().get("topic").get)
-          .setPartitionData(Collections.singletonList(new 
ProduceRequestData.PartitionProduceData()
-            .setIndex(partition)
-            .setRecords(records)))).iterator))
-      .setAcks((-1).toShort)
-      .setTimeoutMs(3000)
-      .setTransactionalId(null)).build()
-
-    val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
-    assertEquals(1, produceResponse.data.responses.size)
-    val topicProduceResponse = produceResponse.data.responses.asScala.head
-    assertEquals(1, topicProduceResponse.partitionResponses.size)   
-    val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
-    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.forCode(partitionProduceResponse.errorCode))
-  }
-
-  /* returns a pair of partition id and leader id */
-  private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) 
= {
-    val partitionToLeader = createTopic(topic, 3, 2)
-    partitionToLeader.collectFirst {
-      case (partition, leader) if leader != -1 => (partition, leader)
-    }.getOrElse(throw new AssertionError(s"No leader elected for topic 
$topic"))
-  }
-
-  @Test
-  def testCorruptLz4ProduceRequest(): Unit = {
-    val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
-    val topicId = getTopicIds().get("topic").get
-    val timestamp = 1000000
-    val memoryRecords = MemoryRecords.withRecords(Compression.lz4().build(),
-      new SimpleRecord(timestamp, "key".getBytes, "value".getBytes))
-    // Change the lz4 checksum value (not the kafka record crc) so that it 
doesn't match the contents
-    val lz4ChecksumOffset = 6
-    memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD 
+ lz4ChecksumOffset, 0)
-    val produceResponse = sendProduceRequest(leader, 
ProduceRequest.builder(new ProduceRequestData()
-      .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
-        new ProduceRequestData.TopicProduceData()
-          .setTopicId(topicId)
-          .setPartitionData(Collections.singletonList(new 
ProduceRequestData.PartitionProduceData()
-            .setIndex(partition)
-            .setRecords(memoryRecords)))).iterator))
-      .setAcks((-1).toShort)
-      .setTimeoutMs(3000)
-      .setTransactionalId(null)).build())
-
-    assertEquals(1, produceResponse.data.responses.size)
-    val topicProduceResponse = produceResponse.data.responses.asScala.head
-    assertEquals(1, topicProduceResponse.partitionResponses.size)   
-    val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
-    assertEquals(topicId, topicProduceResponse.topicId())
-    assertEquals(partition, partitionProduceResponse.index())
-    assertEquals(Errors.CORRUPT_MESSAGE, 
Errors.forCode(partitionProduceResponse.errorCode))
-    assertEquals(-1, partitionProduceResponse.baseOffset)
-    assertEquals(-1, partitionProduceResponse.logAppendTimeMs)
-    
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")),
 1)
-    
assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")
 > 0)
-  }
-
-  @Test
-  def testZSTDProduceRequest(): Unit = {
-    val topic = "topic"
-    val partition = 0
-
-    // Create a single-partition topic compressed with ZSTD
-    val topicConfig = new Properties
-    topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.ZSTD.name)
-    val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
-    val leader = partitionToLeader(partition)
-    val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(),
-      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, 
"value".getBytes))
-    val topicPartition = new TopicPartition("topic", partition)
-    val partitionRecords = new ProduceRequestData()
-      .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
-        new ProduceRequestData.TopicProduceData()
-          .setName("topic") // This test case is testing producer v.7, no need 
to use topic id
-          .setPartitionData(Collections.singletonList(
-            new ProduceRequestData.PartitionProduceData()
-              .setIndex(partition)
-              .setRecords(memoryRecords))))
-        .iterator))
-      .setAcks((-1).toShort)
-      .setTimeoutMs(3000)
-      .setTransactionalId(null)
-
-    // produce request with v7: works fine!
-    val produceResponse1 = sendProduceRequest(leader, new 
ProduceRequest.Builder(7, 7, partitionRecords).build())
-
-    val topicProduceResponse1 = produceResponse1.data.responses.asScala.head
-    val partitionProduceResponse1 = 
topicProduceResponse1.partitionResponses.asScala.head
-    val tp1 = new TopicPartition(topicProduceResponse1.name, 
partitionProduceResponse1.index)
-    assertEquals(topicPartition, tp1)
-    assertEquals(Errors.NONE, 
Errors.forCode(partitionProduceResponse1.errorCode))
-    assertEquals(0, partitionProduceResponse1.baseOffset)
-    assertEquals(-1, partitionProduceResponse1.logAppendTimeMs)
-  }
-
-  private def sendProduceRequest(leaderId: Int, request: ProduceRequest): 
ProduceResponse = {
-    connectAndReceive[ProduceResponse](request, destination = 
brokerSocketServer(leaderId))
-  }
-
-}
-
-object ProduceRequestTest {
-
-  def timestampConfigProvider: java.util.stream.Stream[Arguments] = {
-    val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
-    java.util.stream.Stream.of[Arguments](
-      Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
-      Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() + fiveMinutesInMs))
-    )
-  }
-}
diff --git 
a/server/src/test/java/org/apache/kafka/server/requests/ProduceRequestTest.java 
b/server/src/test/java/org/apache/kafka/server/requests/ProduceRequestTest.java
new file mode 100644
index 00000000000..4dd43b609fc
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/requests/ProduceRequestTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.requests;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.record.internal.DefaultRecordBatch;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.IntegrationTestUtils;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Meter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the produce request protocol. These tests require a 
running cluster and
+ * complement the unit tests in 
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java,
+ * which test request construction and serialization in isolation.
+ */
+@ClusterTestDefaults(brokers = 3)
+public class ProduceRequestTest {
+
+    private static final String TOPIC = "topic";
+    private static final long FIVE_HOURS_IN_MS = 
Duration.ofHours(5).toMillis();
+    private static final String INVALID_MESSAGE_CRC_RECORDS_PER_SEC = 
"InvalidMessageCrcRecordsPerSec";
+
+    private final ClusterInstance cluster;
+
+    ProduceRequestTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testSimpleProduceRequest() throws Exception {
+        cluster.createTopic(TOPIC, 3, (short) 2);
+        TopicPartitionInfo partitionAndLeader = findPartitionWithLeader();
+        int partition = partitionAndLeader.partition();
+        int leaderId = partitionAndLeader.leader().id();
+        sendAndCheckProduceResponse(leaderId, partition,
+            MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord(System.currentTimeMillis(), "key".getBytes(), 
"value".getBytes())),
+            0L);
+
+        sendAndCheckProduceResponse(leaderId, partition,
+            MemoryRecords.withRecords(Compression.gzip().build(),
+                new SimpleRecord(System.currentTimeMillis(), 
"key1".getBytes(), "value1".getBytes()),
+                new SimpleRecord(System.currentTimeMillis(), 
"key2".getBytes(), "value2".getBytes())),
+            1L);
+    }
+
+    @ClusterTest
+    public void testProduceWithTimestampTooOld() throws Exception {
+        doTestProduceWithInvalidTimestamp(
+            TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
+            System.currentTimeMillis() - FIVE_HOURS_IN_MS
+        );
+    }
+
+    @ClusterTest
+    public void testProduceWithTimestampTooNew() throws Exception {
+        doTestProduceWithInvalidTimestamp(
+            TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
+            System.currentTimeMillis() + FIVE_HOURS_IN_MS
+        );
+    }
+
+    @ClusterTest
+    public void testProduceToNonReplica() throws Exception {
+        cluster.createTopic(TOPIC, 1, (short) 1);
+        int leaderId = cluster.getLeaderBrokerId(new TopicPartition(TOPIC, 0));
+        Uuid topicId = getTopicId();
+
+        int nonReplicaId = cluster.brokers().keySet().stream()
+            .filter(id -> id != leaderId)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("No non-replica broker 
found"));
+
+        MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
+            new SimpleRecord("key".getBytes(), "value".getBytes()));
+        ProduceRequest request = ProduceRequest.builder(new 
ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                new ProduceRequestData.TopicProduceData()
+                    .setTopicId(topicId)
+                    .setPartitionData(Collections.singletonList(
+                        new ProduceRequestData.PartitionProduceData()
+                            .setIndex(0)
+                            .setRecords(records))))
+                .iterator()))
+            .setAcks((short) -1)
+            .setTimeoutMs(3000)
+            .setTransactionalId(null)).build();
+
+        ProduceResponse response = sendProduceRequest(nonReplicaId, request);
+        assertEquals(1, response.data().responses().size());
+        var topicResponse = response.data().responses().iterator().next();
+        assertEquals(1, topicResponse.partitionResponses().size());
+        var partitionResponse = topicResponse.partitionResponses().get(0);
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.forCode(partitionResponse.errorCode()));
+    }
+
+    @ClusterTest
+    public void testCorruptLz4ProduceRequest() throws Exception {
+        cluster.createTopic(TOPIC, 3, (short) 2);
+        TopicPartitionInfo partitionAndLeader = findPartitionWithLeader();
+        int partition = partitionAndLeader.partition();
+        int leaderId = partitionAndLeader.leader().id();
+        Uuid topicId = getTopicId();
+
+        MemoryRecords memoryRecords = 
MemoryRecords.withRecords(Compression.lz4().build(),
+            new SimpleRecord(1000000L, "key".getBytes(), "value".getBytes()));
+        // Corrupt the lz4 frame checksum (not the kafka record CRC) to 
trigger CORRUPT_MESSAGE
+        int lz4ChecksumOffset = 6;
+        
memoryRecords.buffer().array()[DefaultRecordBatch.RECORD_BATCH_OVERHEAD + 
lz4ChecksumOffset] = 0;
+
+        ProduceRequest request = ProduceRequest.builder(new 
ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                new ProduceRequestData.TopicProduceData()
+                    .setTopicId(topicId)
+                    .setPartitionData(Collections.singletonList(
+                        new ProduceRequestData.PartitionProduceData()
+                            .setIndex(partition)
+                            .setRecords(memoryRecords))))
+                .iterator()))
+            .setAcks((short) -1)
+            .setTimeoutMs(3000)
+            .setTransactionalId(null)).build();
+
+        ProduceResponse response = sendProduceRequest(leaderId, request);
+        assertEquals(1, response.data().responses().size());
+        var topicResponse = response.data().responses().iterator().next();
+        assertEquals(1, topicResponse.partitionResponses().size());
+        var partitionResponse = topicResponse.partitionResponses().get(0);
+        assertEquals(topicId, topicResponse.topicId());
+        assertEquals(partition, partitionResponse.index());
+        assertEquals(Errors.CORRUPT_MESSAGE, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(-1L, partitionResponse.baseOffset());
+        assertEquals(-1L, partitionResponse.logAppendTimeMs());
+
+        long matchingMetricsCount = 
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+            .filter(k -> 
k.getName().endsWith(INVALID_MESSAGE_CRC_RECORDS_PER_SEC))
+            .count();
+        assertEquals(1, matchingMetricsCount);
+        Meter meter = (Meter) 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+            .filter(e -> 
e.getKey().getName().endsWith(INVALID_MESSAGE_CRC_RECORDS_PER_SEC))
+            .map(Map.Entry::getValue)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("Metric not found: " + 
INVALID_MESSAGE_CRC_RECORDS_PER_SEC));
+        assertTrue(meter.count() > 0);
+    }
+
+    @ClusterTest
+    public void testZSTDProduceRequest() throws Exception {
+        cluster.createTopic(TOPIC, 1, (short) 1,
+            Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd"));
+        int leaderId = cluster.getLeaderBrokerId(new TopicPartition(TOPIC, 0));
+
+        MemoryRecords memoryRecords = 
MemoryRecords.withRecords(Compression.zstd().build(),
+            new SimpleRecord(System.currentTimeMillis(), "key".getBytes(), 
"value".getBytes()));
+
+        // v7 uses topic name rather than topic ID
+        ProduceRequestData data = new ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                new ProduceRequestData.TopicProduceData()
+                    .setName(TOPIC)
+                    .setPartitionData(Collections.singletonList(
+                        new ProduceRequestData.PartitionProduceData()
+                            .setIndex(0)
+                            .setRecords(memoryRecords))))
+                .iterator()))
+            .setAcks((short) -1)
+            .setTimeoutMs(3000)
+            .setTransactionalId(null);
+
+        ProduceResponse response = sendProduceRequest(leaderId, new 
ProduceRequest.Builder((short) 7, (short) 7, data).build());
+        var topicResponse = response.data().responses().iterator().next();
+        var partitionResponse = topicResponse.partitionResponses().get(0);
+        assertEquals(TOPIC, topicResponse.name());
+        assertEquals(0, partitionResponse.index());
+        assertEquals(Errors.NONE, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(0L, partitionResponse.baseOffset());
+        assertEquals(-1L, partitionResponse.logAppendTimeMs());
+    }
+
+    private ProduceResponse sendProduceRequest(int brokerId, ProduceRequest 
request) throws IOException {
+        KafkaBroker broker = cluster.brokers().get(brokerId);
+        int port = broker.socketServer().boundPort(cluster.clientListener());
+        return IntegrationTestUtils.connectAndReceive(request, port);
+    }
+
+    private void sendAndCheckProduceResponse(int leaderId, int partition,
+                                             MemoryRecords records, long 
expectedOffset) throws IOException, ExecutionException, InterruptedException {
+        Uuid topicId = getTopicId();
+        ProduceRequest request = ProduceRequest.builder(new 
ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                new ProduceRequestData.TopicProduceData()
+                    .setTopicId(topicId)
+                    .setPartitionData(Collections.singletonList(
+                        new ProduceRequestData.PartitionProduceData()
+                            .setIndex(partition)
+                            .setRecords(records))))
+                .iterator()))
+            .setAcks((short) -1)
+            .setTimeoutMs(3000)
+            .setTransactionalId(null)).build();
+
+        assertEquals(ApiKeys.PRODUCE.latestVersion(), request.version());
+        ProduceResponse response = sendProduceRequest(leaderId, request);
+        assertEquals(1, response.data().responses().size());
+        var topicResponse = response.data().responses().iterator().next();
+        assertEquals(1, topicResponse.partitionResponses().size());
+        var partitionResponse = topicResponse.partitionResponses().get(0);
+        assertEquals(topicId, topicResponse.topicId());
+        assertEquals(partition, partitionResponse.index());
+        assertEquals(Errors.NONE, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(expectedOffset, partitionResponse.baseOffset());
+        assertEquals(-1L, partitionResponse.logAppendTimeMs());
+        assertTrue(partitionResponse.recordErrors().isEmpty());
+    }
+
+    private void doTestProduceWithInvalidTimestamp(String timestampConfig, 
long recordTimestamp) throws Exception {
+        cluster.createTopic(TOPIC, 1, (short) 1, Map.of(timestampConfig, 
"1000"));
+        int leaderId = cluster.getLeaderBrokerId(new TopicPartition(TOPIC, 0));
+        Uuid topicId = getTopicId();
+
+        ByteBuffer buf = ByteBuffer.allocate(512);
+        var builder = MemoryRecords.builder(buf, RecordBatch.MAGIC_VALUE_V2,
+            Compression.gzip().build(), TimestampType.CREATE_TIME, 0L);
+        builder.appendWithOffset(0, recordTimestamp, null, "hello".getBytes());
+        builder.appendWithOffset(1, recordTimestamp, null, "there".getBytes());
+        builder.appendWithOffset(2, recordTimestamp, null, 
"beautiful".getBytes());
+        MemoryRecords records = builder.build();
+
+        ProduceResponse response = sendProduceRequest(leaderId, 
ProduceRequest.builder(new ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                new ProduceRequestData.TopicProduceData()
+                    .setTopicId(topicId)
+                    .setPartitionData(Collections.singletonList(
+                        new ProduceRequestData.PartitionProduceData()
+                            .setIndex(0)
+                            .setRecords(records))))
+                .iterator()))
+            .setAcks((short) -1)
+            .setTimeoutMs(3000)
+            .setTransactionalId(null)).build());
+
+        assertEquals(1, response.data().responses().size());
+        var topicResponse = response.data().responses().iterator().next();
+        assertEquals(1, topicResponse.partitionResponses().size());
+        var partitionResponse = topicResponse.partitionResponses().get(0);
+        assertEquals(topicId, topicResponse.topicId());
+        assertEquals(0, partitionResponse.index());
+        assertEquals(Errors.INVALID_TIMESTAMP, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(3, partitionResponse.recordErrors().size());
+        for (int i = 0; i < partitionResponse.recordErrors().size(); i++) {
+            assertEquals(i, 
partitionResponse.recordErrors().get(i).batchIndex());
+            
assertNotNull(partitionResponse.recordErrors().get(i).batchIndexErrorMessage());
+        }
+        assertEquals("One or more records have been rejected due to invalid 
timestamp",
+            partitionResponse.errorMessage());
+    }
+
+    private Uuid getTopicId() throws ExecutionException, InterruptedException {
+        try (Admin admin = cluster.admin()) {
+            return admin.describeTopics(List.of(TOPIC))
+                .topicNameValues().get(TOPIC).get().topicId();
+        }
+    }
+
+    private TopicPartitionInfo findPartitionWithLeader() throws 
ExecutionException, InterruptedException {
+        try (Admin admin = cluster.admin()) {
+            TopicDescription desc = admin.describeTopics(List.of(TOPIC))
+                .topicNameValues().get(TOPIC).get();
+            return desc.partitions().stream()
+                .filter(p -> p.leader() != null && p.leader().id() != -1)
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("No partition with 
leader found for topic " + TOPIC));
+        }
+    }
+}

Reply via email to