DL1231 commented on code in PR #20148:
URL: https://github.com/apache/kafka/pull/20148#discussion_r2217121724
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
}
}
+ @ClusterTest
+ public void testExplicitOverrideAcknowledgeCorruptedMessage() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT),
+ null,
+ mockErrorDeserializer(3))) {
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record3 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.send(record2);
+ producer.send(record3);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofSeconds(60));
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
records.iterator();
+
+ ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+ ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+ assertEquals(0L, firstRecord.offset());
+ assertEquals(1L, secondRecord.offset());
+ shareConsumer.acknowledge(firstRecord);
+ shareConsumer.acknowledge(secondRecord);
+
+ RecordDeserializationException rde =
assertThrows(RecordDeserializationException.class, () ->
shareConsumer.poll(Duration.ofSeconds(60)));
+ assertEquals(2, rde.offset());
+ shareConsumer.commitSync();
+
+ // The corrupted record was automatically released, so we can
still obtain it.
+ rde = assertThrows(RecordDeserializationException.class, () ->
shareConsumer.poll(Duration.ofSeconds(60)));
+ assertEquals(2, rde.offset());
+
+ // Reject this record
+ shareConsumer.acknowledge(rde.topicPartition().topic(),
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+ shareConsumer.commitSync();
+
+ records = shareConsumer.poll(Duration.ZERO);
+ assertEquals(0, records.count());
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+ }
+
+ @ClusterTest
+ public void testExplicitAcknowledgeOffsetThrowsNotException() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))) {
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofSeconds(60));
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> consumedRecord =
records.records(tp).get(0);
+ assertEquals(0L, consumedRecord.offset());
+
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(),
AcknowledgeType.ACCEPT));
+
+ shareConsumer.acknowledge(consumedRecord);
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+ }
+
+ @ClusterTest
+ public void testExplicitAcknowledgeOffsetThrowsParametersError() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT),
+ null,
+ mockErrorDeserializer(2))) {
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.send(record2);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofSeconds(60));
+ assertEquals(1, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
records.iterator();
+
+ ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+ assertEquals(0L, firstRecord.offset());
+ shareConsumer.acknowledge(firstRecord);
+
+ final RecordDeserializationException rde =
assertThrows(RecordDeserializationException.class, () ->
shareConsumer.poll(Duration.ofSeconds(60)));
+ assertEquals(1, rde.offset());
+
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge("foo", rde.topicPartition().partition(),
rde.offset(), AcknowledgeType.REJECT));
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(),
AcknowledgeType.REJECT));
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0,
AcknowledgeType.REJECT));
+
+ // Reject this record
+ shareConsumer.acknowledge(rde.topicPartition().topic(),
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+ shareConsumer.commitSync();
Review Comment:
Good catch! I have updated the patch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]