junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r2054494297
##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -85,4 +85,80 @@ class ProducerSendWhileDeletionTest extends
IntegrationTestHarness {
assertEquals(topic, producer.send(new ProducerRecord(topic, null,
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
}
+ /**
+ * Tests that Producer produce to new topic id after recreation.
+ *
+ * Producer will attempt to send messages to the partition specified in each
record, and should
+ * succeed as long as the metadata has been updated with new topic id.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testSendWithRecreatedTopic(quorum: String): Unit = {
+ val numRecords = 10
+ val topic = "topic"
+ createTopic(topic)
+ val admin = createAdminClient()
+ val topicId = topicMetadata(admin, topic).topicId()
+ val producer = createProducer()
+
+ (1 to numRecords).foreach { i =>
+ val resp = producer.send(new ProducerRecord(topic, null, ("value" +
i).getBytes(StandardCharsets.UTF_8))).get
+ assertEquals(topic, resp.topic())
+ }
+ // Start topic deletion
+ deleteTopic(topic, listenerName)
+
+ // Verify that the topic is deleted when no metadata request comes in
+ TestUtils.verifyTopicDeletion(topic, 2, brokers)
+ createTopic(topic)
+ assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+ // Producer should be able to send messages even after topic gets recreated
+ val recordMetadata: RecordMetadata = producer.send(new
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+ assertEquals(topic, recordMetadata.topic())
+ assertEquals(0, recordMetadata.offset())
+ }
+
+ /**
+ * Tests that Producer produce to topic during reassignment where topic
metadata change on broker side.
+ *
+ * Producer will attempt to send messages to the partition specified in each
record, and should
+ * succeed as long as the metadata cache on the leader includes the
partition topic id.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+ val numRecords = 10
+ val topic = "topic"
+ val partition0: TopicPartition = new TopicPartition(topic, 0)
+ val admin: Admin = createAdminClient()
+
+ // Create topic with leader as 0 for the 1 partition.
+ createTopicWithAssignment(topic, Map(0 -> Seq(0)))
+ TestUtils.assertLeader(admin, partition0, 0)
+
+ val topicDetails = topicMetadata(admin, topic)
+ val producer = createProducer()
+
+ (1 to numRecords).foreach { i =>
+ val resp = producer.send(new ProducerRecord(topic, null, ("value" +
i).getBytes(StandardCharsets.UTF_8))).get
+ assertEquals(topic, resp.topic())
+ }
+
+ val reassignment = Map(
+ partition0 -> Optional.of(new
NewPartitionReassignment(util.Arrays.asList(1))),
+ )
+
+ // Change assignment of one of the replicas from 0 to 1. Leadership moves
be 1.
Review Comment:
How about "Change replica assignment from 0 to 1. Leadership moves to 1.“?
##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -102,13 +103,36 @@ public void testBuildWithCurrentMessageFormat() {
ProduceRequest.Builder requestBuilder = ProduceRequest.builder(
new ProduceRequestData()
.setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new
ProduceRequestData.TopicProduceData().setName("test").setPartitionData(Collections.singletonList(
- new
ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
+ new ProduceRequestData.TopicProduceData()
+
.setTopicId(Uuid.fromString("H3Emm3vW7AKKO4NTRPaCWt"))
+ .setPartitionData(Collections.singletonList(
+ new
ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
.iterator()))
.setAcks((short) 1)
.setTimeoutMs(5000),
false);
- assertEquals(3, requestBuilder.oldestAllowedVersion());
+ assertEquals(ApiKeys.PRODUCE.oldestVersion(),
requestBuilder.oldestAllowedVersion());
+ assertEquals(ApiKeys.PRODUCE.latestVersion(),
requestBuilder.latestAllowedVersion());
+ }
+
+ @Test
+ public void testBuildWithCurrentMessageFormatWithoutTopicId() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+ ProduceRequest.Builder requestBuilder = ProduceRequest.builder(
+ new ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new
ProduceRequestData.TopicProduceData()
+ .setName("test")
Review Comment:
Could we add a comment to this test? If the topicId is not set, we will send
the request with the default topicId of 0. The client will get an
UNKNOWN_TOPIC_ID ERROR.
##########
core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:
##########
@@ -214,19 +217,20 @@ class ProduceRequestTest extends BaseRequestTest {
@ValueSource(strings = Array("kraft"))
def testCorruptLz4ProduceRequest(quorum: String): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
+ val topicId = getTopicIds().get("topic").get
val timestamp = 1000000
val memoryRecords = MemoryRecords.withRecords(Compression.lz4().build(),
new SimpleRecord(timestamp, "key".getBytes, "value".getBytes))
// Change the lz4 checksum value (not the kafka record crc) so that it
doesn't match the contents
val lz4ChecksumOffset = 6
memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD
+ lz4ChecksumOffset, 0)
- val topicPartition = new TopicPartition("topic", partition)
val produceResponse = sendProduceRequest(leader,
ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
- .setName(topicPartition.topic())
+ .setName("topic")
Review Comment:
Should we remove `setName`?
##########
core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:
##########
@@ -182,13 +185,13 @@ class ProduceRequestTest extends BaseRequestTest {
// Send the produce request to the non-replica
val records = MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("key".getBytes, "value".getBytes))
- val topicPartition = new TopicPartition("topic", partition)
val produceRequest = ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
- .setName(topicPartition.topic())
+ .setName("topic")
Review Comment:
Should we remove `setName`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]