This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e0596d9848 MINOR: Migrate ProduceRequestTest to server module (#22124)
2e0596d9848 is described below
commit 2e0596d9848e743039ae0cafd1a3cd01d79a4666
Author: Yunchi Pang <[email protected]>
AuthorDate: Fri May 1 17:47:34 2026 -0400
MINOR: Migrate ProduceRequestTest to server module (#22124)
Migrate `ProduceRequestTest` to server module and rewrite it in java.
Reviewers: Ken Huang <[email protected]>, Mickael Maison
<[email protected]>, Chia-Ping Tsai <[email protected]>
---------
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../unit/kafka/server/ProduceRequestTest.scala | 301 -------------------
.../kafka/server/requests/ProduceRequestTest.java | 322 +++++++++++++++++++++
2 files changed, 322 insertions(+), 301 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
deleted file mode 100644
index 5354c3e1d68..00000000000
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.nio.ByteBuffer
-import java.util.{Collections, Properties}
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, TopicDescription}
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.message.ProduceRequestData
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.util.concurrent.TimeUnit
-import scala.jdk.CollectionConverters._
-
-/**
- * Subclasses of `BaseProduceSendRequestTest` exercise the producer and
produce request/response. This class
- * complements those classes with tests that require lower-level access to
the protocol.
- */
-class ProduceRequestTest extends BaseRequestTest {
-
- val metricsKeySet =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
-
- @Test
- def testSimpleProduceRequest(): Unit = {
- val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
-
- def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit
= {
- val topicId = getTopicIds().get("topic").get
- val produceRequest = ProduceRequest.builder(new ProduceRequestData()
- .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new ProduceRequestData.TopicProduceData()
- .setTopicId(topicId)
- .setPartitionData(Collections.singletonList(new
ProduceRequestData.PartitionProduceData()
- .setIndex(partition)
- .setRecords(memoryRecords)))).iterator))
- .setAcks((-1).toShort)
- .setTimeoutMs(3000)
- .setTransactionalId(null)).build()
- assertEquals(ApiKeys.PRODUCE.latestVersion(), produceRequest.version())
- val produceResponse = sendProduceRequest(leader, produceRequest)
- assertEquals(1, produceResponse.data.responses.size)
- val topicProduceResponse = produceResponse.data.responses.asScala.head
- assertEquals(1, topicProduceResponse.partitionResponses.size)
- val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
- assertEquals(topicId, topicProduceResponse.topicId())
- assertEquals(partition, partitionProduceResponse.index())
- assertEquals(Errors.NONE,
Errors.forCode(partitionProduceResponse.errorCode))
- assertEquals(expectedOffset, partitionProduceResponse.baseOffset)
- assertEquals(-1, partitionProduceResponse.logAppendTimeMs)
- assertTrue(partitionProduceResponse.recordErrors.isEmpty)
- }
-
- sendAndCheck(MemoryRecords.withRecords(Compression.NONE,
- new SimpleRecord(System.currentTimeMillis(), "key".getBytes,
"value".getBytes)), 0)
-
- sendAndCheck(MemoryRecords.withRecords(Compression.gzip().build(),
- new SimpleRecord(System.currentTimeMillis(), "key1".getBytes,
"value1".getBytes),
- new SimpleRecord(System.currentTimeMillis(), "key2".getBytes,
"value2".getBytes)), 1)
- }
-
- private def getPartitionToLeader(
- admin: Admin,
- topic: String
- ): Map[Int, Int] = {
- var topicDescription: TopicDescription = null
- TestUtils.waitUntilTrue(() => {
- val topicMap = admin.
- describeTopics(java.util.Arrays.asList(topic)).
- allTopicNames().get(10, TimeUnit.MINUTES)
- topicDescription = topicMap.get(topic)
- topicDescription != null
- }, "Timed out waiting to describe topic " + topic)
- topicDescription.partitions().asScala.map(p => {
- p.partition() -> p.leader().id()
- }).toMap
- }
-
- @ParameterizedTest
- @MethodSource(Array("timestampConfigProvider"))
- def testProduceWithInvalidTimestamp(messageTimeStampConfig: String,
recordTimestamp: Long): Unit = {
- val topic = "topic"
- val partition = 0
- val topicConfig = new Properties
- topicConfig.setProperty(messageTimeStampConfig, "1000")
- val admin = createAdminClient()
- TestUtils.createTopicWithAdmin(
- admin = admin,
- topic = topic,
- brokers = brokers,
- controllers = controllerServers,
- numPartitions = 1,
- replicationFactor = 1,
- topicConfig = topicConfig
- )
- val partitionToLeader = getPartitionToLeader(admin, topic)
- val leader = partitionToLeader(partition)
- val topicDescription = TestUtils.describeTopic(createAdminClient(), topic)
-
- def createRecords(magicValue: Byte, timestamp: Long, codec: Compression):
MemoryRecords = {
- val buf = ByteBuffer.allocate(512)
- val builder = MemoryRecords.builder(buf, magicValue, codec,
TimestampType.CREATE_TIME, 0L)
- builder.appendWithOffset(0, timestamp, null, "hello".getBytes)
- builder.appendWithOffset(1, timestamp, null, "there".getBytes)
- builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes)
- builder.build()
- }
-
- val records = createRecords(RecordBatch.MAGIC_VALUE_V2, recordTimestamp,
Compression.gzip().build())
- val topicPartition = new TopicIdPartition(topicDescription.topicId(),
partition, "topic")
- val produceResponse = sendProduceRequest(leader,
ProduceRequest.builder(new ProduceRequestData()
- .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new ProduceRequestData.TopicProduceData()
- .setTopicId(topicPartition.topicId())
- .setPartitionData(Collections.singletonList(new
ProduceRequestData.PartitionProduceData()
- .setIndex(topicPartition.partition())
- .setRecords(records)))).iterator))
- .setAcks((-1).toShort)
- .setTimeoutMs(3000)
- .setTransactionalId(null)).build())
-
- assertEquals(1, produceResponse.data.responses.size)
- val topicProduceResponse = produceResponse.data.responses.asScala.head
- assertEquals(1, topicProduceResponse.partitionResponses.size)
- val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
- val tp = new TopicIdPartition(topicProduceResponse.topicId(),
- partitionProduceResponse.index,
- getTopicNames().get(topicProduceResponse.topicId()).getOrElse(""))
- assertEquals(topicPartition, tp)
- assertEquals(Errors.INVALID_TIMESTAMP,
Errors.forCode(partitionProduceResponse.errorCode))
- // there are 3 records with InvalidTimestampException created from inner
function createRecords
- assertEquals(3, partitionProduceResponse.recordErrors.size)
- val recordErrors = partitionProduceResponse.recordErrors.asScala
- recordErrors.indices.foreach(i => assertEquals(i,
recordErrors(i).batchIndex))
- recordErrors.foreach(recordError =>
assertNotNull(recordError.batchIndexErrorMessage))
- assertEquals("One or more records have been rejected due to invalid
timestamp", partitionProduceResponse.errorMessage)
- }
-
- @Test
- def testProduceToNonReplica(): Unit = {
- val topic = "topic"
- val partition = 0
-
- // Create a single-partition topic and find a broker which is not the
leader
- val admin = createAdminClient()
- TestUtils.createTopicWithAdmin(
- admin = admin,
- topic = topic,
- brokers = brokers,
- controllers = controllerServers
- )
- val partitionToLeader = getPartitionToLeader(admin, topic)
- val leader = partitionToLeader(partition)
- val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
- assertTrue(nonReplicaOpt.isDefined)
- val nonReplicaId = nonReplicaOpt.get.config.brokerId
-
- // Send the produce request to the non-replica
- val records = MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("key".getBytes, "value".getBytes))
- val produceRequest = ProduceRequest.builder(new ProduceRequestData()
- .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new ProduceRequestData.TopicProduceData()
- .setTopicId(getTopicIds().get("topic").get)
- .setPartitionData(Collections.singletonList(new
ProduceRequestData.PartitionProduceData()
- .setIndex(partition)
- .setRecords(records)))).iterator))
- .setAcks((-1).toShort)
- .setTimeoutMs(3000)
- .setTransactionalId(null)).build()
-
- val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
- assertEquals(1, produceResponse.data.responses.size)
- val topicProduceResponse = produceResponse.data.responses.asScala.head
- assertEquals(1, topicProduceResponse.partitionResponses.size)
- val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
- assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionProduceResponse.errorCode))
- }
-
- /* returns a pair of partition id and leader id */
- private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int)
= {
- val partitionToLeader = createTopic(topic, 3, 2)
- partitionToLeader.collectFirst {
- case (partition, leader) if leader != -1 => (partition, leader)
- }.getOrElse(throw new AssertionError(s"No leader elected for topic
$topic"))
- }
-
- @Test
- def testCorruptLz4ProduceRequest(): Unit = {
- val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
- val topicId = getTopicIds().get("topic").get
- val timestamp = 1000000
- val memoryRecords = MemoryRecords.withRecords(Compression.lz4().build(),
- new SimpleRecord(timestamp, "key".getBytes, "value".getBytes))
- // Change the lz4 checksum value (not the kafka record crc) so that it
doesn't match the contents
- val lz4ChecksumOffset = 6
- memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD
+ lz4ChecksumOffset, 0)
- val produceResponse = sendProduceRequest(leader,
ProduceRequest.builder(new ProduceRequestData()
- .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new ProduceRequestData.TopicProduceData()
- .setTopicId(topicId)
- .setPartitionData(Collections.singletonList(new
ProduceRequestData.PartitionProduceData()
- .setIndex(partition)
- .setRecords(memoryRecords)))).iterator))
- .setAcks((-1).toShort)
- .setTimeoutMs(3000)
- .setTransactionalId(null)).build())
-
- assertEquals(1, produceResponse.data.responses.size)
- val topicProduceResponse = produceResponse.data.responses.asScala.head
- assertEquals(1, topicProduceResponse.partitionResponses.size)
- val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
- assertEquals(topicId, topicProduceResponse.topicId())
- assertEquals(partition, partitionProduceResponse.index())
- assertEquals(Errors.CORRUPT_MESSAGE,
Errors.forCode(partitionProduceResponse.errorCode))
- assertEquals(-1, partitionProduceResponse.baseOffset)
- assertEquals(-1, partitionProduceResponse.logAppendTimeMs)
-
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")),
1)
-
assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")
> 0)
- }
-
- @Test
- def testZSTDProduceRequest(): Unit = {
- val topic = "topic"
- val partition = 0
-
- // Create a single-partition topic compressed with ZSTD
- val topicConfig = new Properties
- topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.ZSTD.name)
- val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
- val leader = partitionToLeader(partition)
- val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(),
- new SimpleRecord(System.currentTimeMillis(), "key".getBytes,
"value".getBytes))
- val topicPartition = new TopicPartition("topic", partition)
- val partitionRecords = new ProduceRequestData()
- .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new ProduceRequestData.TopicProduceData()
- .setName("topic") // This test case is testing producer v.7, no need
to use topic id
- .setPartitionData(Collections.singletonList(
- new ProduceRequestData.PartitionProduceData()
- .setIndex(partition)
- .setRecords(memoryRecords))))
- .iterator))
- .setAcks((-1).toShort)
- .setTimeoutMs(3000)
- .setTransactionalId(null)
-
- // produce request with v7: works fine!
- val produceResponse1 = sendProduceRequest(leader, new
ProduceRequest.Builder(7, 7, partitionRecords).build())
-
- val topicProduceResponse1 = produceResponse1.data.responses.asScala.head
- val partitionProduceResponse1 =
topicProduceResponse1.partitionResponses.asScala.head
- val tp1 = new TopicPartition(topicProduceResponse1.name,
partitionProduceResponse1.index)
- assertEquals(topicPartition, tp1)
- assertEquals(Errors.NONE,
Errors.forCode(partitionProduceResponse1.errorCode))
- assertEquals(0, partitionProduceResponse1.baseOffset)
- assertEquals(-1, partitionProduceResponse1.logAppendTimeMs)
- }
-
- private def sendProduceRequest(leaderId: Int, request: ProduceRequest):
ProduceResponse = {
- connectAndReceive[ProduceResponse](request, destination =
brokerSocketServer(leaderId))
- }
-
-}
-
-object ProduceRequestTest {
-
- def timestampConfigProvider: java.util.stream.Stream[Arguments] = {
- val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
- java.util.stream.Stream.of[Arguments](
- Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
- Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
Long.box(System.currentTimeMillis() + fiveMinutesInMs))
- )
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/requests/ProduceRequestTest.java
b/server/src/test/java/org/apache/kafka/server/requests/ProduceRequestTest.java
new file mode 100644
index 00000000000..4dd43b609fc
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/requests/ProduceRequestTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.requests;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.record.internal.DefaultRecordBatch;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.IntegrationTestUtils;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Meter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the produce request protocol. These tests require a
running cluster and
+ * complement the unit tests in
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java,
+ * which test request construction and serialization in isolation.
+ */
+@ClusterTestDefaults(brokers = 3)
+public class ProduceRequestTest {
+
+ private static final String TOPIC = "topic";
+ private static final long FIVE_HOURS_IN_MS =
Duration.ofHours(5).toMillis();
+ private static final String INVALID_MESSAGE_CRC_RECORDS_PER_SEC =
"InvalidMessageCrcRecordsPerSec";
+
+ private final ClusterInstance cluster;
+
+ ProduceRequestTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ @ClusterTest
+ public void testSimpleProduceRequest() throws Exception {
+ cluster.createTopic(TOPIC, 3, (short) 2);
+ TopicPartitionInfo partitionAndLeader = findPartitionWithLeader();
+ int partition = partitionAndLeader.partition();
+ int leaderId = partitionAndLeader.leader().id();
+ sendAndCheckProduceResponse(leaderId, partition,
+ MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(System.currentTimeMillis(), "key".getBytes(),
"value".getBytes())),
+ 0L);
+
+ sendAndCheckProduceResponse(leaderId, partition,
+ MemoryRecords.withRecords(Compression.gzip().build(),
+ new SimpleRecord(System.currentTimeMillis(),
"key1".getBytes(), "value1".getBytes()),
+ new SimpleRecord(System.currentTimeMillis(),
"key2".getBytes(), "value2".getBytes())),
+ 1L);
+ }
+
+ @ClusterTest
+ public void testProduceWithTimestampTooOld() throws Exception {
+ doTestProduceWithInvalidTimestamp(
+ TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
+ System.currentTimeMillis() - FIVE_HOURS_IN_MS
+ );
+ }
+
+ @ClusterTest
+ public void testProduceWithTimestampTooNew() throws Exception {
+ doTestProduceWithInvalidTimestamp(
+ TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
+ System.currentTimeMillis() + FIVE_HOURS_IN_MS
+ );
+ }
+
+ @ClusterTest
+ public void testProduceToNonReplica() throws Exception {
+ cluster.createTopic(TOPIC, 1, (short) 1);
+ int leaderId = cluster.getLeaderBrokerId(new TopicPartition(TOPIC, 0));
+ Uuid topicId = getTopicId();
+
+ int nonReplicaId = cluster.brokers().keySet().stream()
+ .filter(id -> id != leaderId)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("No non-replica broker
found"));
+
+ MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("key".getBytes(), "value".getBytes()));
+ ProduceRequest request = ProduceRequest.builder(new
ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setTopicId(topicId)
+ .setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(0)
+ .setRecords(records))))
+ .iterator()))
+ .setAcks((short) -1)
+ .setTimeoutMs(3000)
+ .setTransactionalId(null)).build();
+
+ ProduceResponse response = sendProduceRequest(nonReplicaId, request);
+ assertEquals(1, response.data().responses().size());
+ var topicResponse = response.data().responses().iterator().next();
+ assertEquals(1, topicResponse.partitionResponses().size());
+ var partitionResponse = topicResponse.partitionResponses().get(0);
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionResponse.errorCode()));
+ }
+
+ @ClusterTest
+ public void testCorruptLz4ProduceRequest() throws Exception {
+ cluster.createTopic(TOPIC, 3, (short) 2);
+ TopicPartitionInfo partitionAndLeader = findPartitionWithLeader();
+ int partition = partitionAndLeader.partition();
+ int leaderId = partitionAndLeader.leader().id();
+ Uuid topicId = getTopicId();
+
+ MemoryRecords memoryRecords =
MemoryRecords.withRecords(Compression.lz4().build(),
+ new SimpleRecord(1000000L, "key".getBytes(), "value".getBytes()));
+ // Corrupt the lz4 frame checksum (not the kafka record CRC) to
trigger CORRUPT_MESSAGE
+ int lz4ChecksumOffset = 6;
+
memoryRecords.buffer().array()[DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
lz4ChecksumOffset] = 0;
+
+ ProduceRequest request = ProduceRequest.builder(new
ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setTopicId(topicId)
+ .setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(partition)
+ .setRecords(memoryRecords))))
+ .iterator()))
+ .setAcks((short) -1)
+ .setTimeoutMs(3000)
+ .setTransactionalId(null)).build();
+
+ ProduceResponse response = sendProduceRequest(leaderId, request);
+ assertEquals(1, response.data().responses().size());
+ var topicResponse = response.data().responses().iterator().next();
+ assertEquals(1, topicResponse.partitionResponses().size());
+ var partitionResponse = topicResponse.partitionResponses().get(0);
+ assertEquals(topicId, topicResponse.topicId());
+ assertEquals(partition, partitionResponse.index());
+ assertEquals(Errors.CORRUPT_MESSAGE,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(-1L, partitionResponse.baseOffset());
+ assertEquals(-1L, partitionResponse.logAppendTimeMs());
+
+ long matchingMetricsCount =
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+ .filter(k ->
k.getName().endsWith(INVALID_MESSAGE_CRC_RECORDS_PER_SEC))
+ .count();
+ assertEquals(1, matchingMetricsCount);
+ Meter meter = (Meter)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(e ->
e.getKey().getName().endsWith(INVALID_MESSAGE_CRC_RECORDS_PER_SEC))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Metric not found: " +
INVALID_MESSAGE_CRC_RECORDS_PER_SEC));
+ assertTrue(meter.count() > 0);
+ }
+
+ @ClusterTest
+ public void testZSTDProduceRequest() throws Exception {
+ cluster.createTopic(TOPIC, 1, (short) 1,
+ Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd"));
+ int leaderId = cluster.getLeaderBrokerId(new TopicPartition(TOPIC, 0));
+
+ MemoryRecords memoryRecords =
MemoryRecords.withRecords(Compression.zstd().build(),
+ new SimpleRecord(System.currentTimeMillis(), "key".getBytes(),
"value".getBytes()));
+
+ // v7 uses topic name rather than topic ID
+ ProduceRequestData data = new ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setName(TOPIC)
+ .setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(0)
+ .setRecords(memoryRecords))))
+ .iterator()))
+ .setAcks((short) -1)
+ .setTimeoutMs(3000)
+ .setTransactionalId(null);
+
+ ProduceResponse response = sendProduceRequest(leaderId, new
ProduceRequest.Builder((short) 7, (short) 7, data).build());
+ var topicResponse = response.data().responses().iterator().next();
+ var partitionResponse = topicResponse.partitionResponses().get(0);
+ assertEquals(TOPIC, topicResponse.name());
+ assertEquals(0, partitionResponse.index());
+ assertEquals(Errors.NONE,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(0L, partitionResponse.baseOffset());
+ assertEquals(-1L, partitionResponse.logAppendTimeMs());
+ }
+
+ private ProduceResponse sendProduceRequest(int brokerId, ProduceRequest
request) throws IOException {
+ KafkaBroker broker = cluster.brokers().get(brokerId);
+ int port = broker.socketServer().boundPort(cluster.clientListener());
+ return IntegrationTestUtils.connectAndReceive(request, port);
+ }
+
+ private void sendAndCheckProduceResponse(int leaderId, int partition,
+ MemoryRecords records, long
expectedOffset) throws IOException, ExecutionException, InterruptedException {
+ Uuid topicId = getTopicId();
+ ProduceRequest request = ProduceRequest.builder(new
ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setTopicId(topicId)
+ .setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(partition)
+ .setRecords(records))))
+ .iterator()))
+ .setAcks((short) -1)
+ .setTimeoutMs(3000)
+ .setTransactionalId(null)).build();
+
+ assertEquals(ApiKeys.PRODUCE.latestVersion(), request.version());
+ ProduceResponse response = sendProduceRequest(leaderId, request);
+ assertEquals(1, response.data().responses().size());
+ var topicResponse = response.data().responses().iterator().next();
+ assertEquals(1, topicResponse.partitionResponses().size());
+ var partitionResponse = topicResponse.partitionResponses().get(0);
+ assertEquals(topicId, topicResponse.topicId());
+ assertEquals(partition, partitionResponse.index());
+ assertEquals(Errors.NONE,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(expectedOffset, partitionResponse.baseOffset());
+ assertEquals(-1L, partitionResponse.logAppendTimeMs());
+ assertTrue(partitionResponse.recordErrors().isEmpty());
+ }
+
+ private void doTestProduceWithInvalidTimestamp(String timestampConfig,
long recordTimestamp) throws Exception {
+ cluster.createTopic(TOPIC, 1, (short) 1, Map.of(timestampConfig,
"1000"));
+ int leaderId = cluster.getLeaderBrokerId(new TopicPartition(TOPIC, 0));
+ Uuid topicId = getTopicId();
+
+ ByteBuffer buf = ByteBuffer.allocate(512);
+ var builder = MemoryRecords.builder(buf, RecordBatch.MAGIC_VALUE_V2,
+ Compression.gzip().build(), TimestampType.CREATE_TIME, 0L);
+ builder.appendWithOffset(0, recordTimestamp, null, "hello".getBytes());
+ builder.appendWithOffset(1, recordTimestamp, null, "there".getBytes());
+ builder.appendWithOffset(2, recordTimestamp, null,
"beautiful".getBytes());
+ MemoryRecords records = builder.build();
+
+ ProduceResponse response = sendProduceRequest(leaderId,
ProduceRequest.builder(new ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setTopicId(topicId)
+ .setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(0)
+ .setRecords(records))))
+ .iterator()))
+ .setAcks((short) -1)
+ .setTimeoutMs(3000)
+ .setTransactionalId(null)).build());
+
+ assertEquals(1, response.data().responses().size());
+ var topicResponse = response.data().responses().iterator().next();
+ assertEquals(1, topicResponse.partitionResponses().size());
+ var partitionResponse = topicResponse.partitionResponses().get(0);
+ assertEquals(topicId, topicResponse.topicId());
+ assertEquals(0, partitionResponse.index());
+ assertEquals(Errors.INVALID_TIMESTAMP,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(3, partitionResponse.recordErrors().size());
+ for (int i = 0; i < partitionResponse.recordErrors().size(); i++) {
+ assertEquals(i,
partitionResponse.recordErrors().get(i).batchIndex());
+
assertNotNull(partitionResponse.recordErrors().get(i).batchIndexErrorMessage());
+ }
+ assertEquals("One or more records have been rejected due to invalid
timestamp",
+ partitionResponse.errorMessage());
+ }
+
+ private Uuid getTopicId() throws ExecutionException, InterruptedException {
+ try (Admin admin = cluster.admin()) {
+ return admin.describeTopics(List.of(TOPIC))
+ .topicNameValues().get(TOPIC).get().topicId();
+ }
+ }
+
+ private TopicPartitionInfo findPartitionWithLeader() throws
ExecutionException, InterruptedException {
+ try (Admin admin = cluster.admin()) {
+ TopicDescription desc = admin.describeTopics(List.of(TOPIC))
+ .topicNameValues().get(TOPIC).get();
+ return desc.partitions().stream()
+ .filter(p -> p.leader() != null && p.leader().id() != -1)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("No partition with
leader found for topic " + TOPIC));
+ }
+ }
+}