AndrewJSchofield commented on code in PR #20148:
URL: https://github.com/apache/kafka/pull/20148#discussion_r2216370644
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java:
##########
@@ -123,6 +125,29 @@ public void acknowledge(final ConsumerRecord<K, V> record,
AcknowledgeType type)
throw new IllegalStateException("The record cannot be acknowledged.");
}
+ /**
+ * Acknowledge a single record by its topic, partition and offset in the
current batch.
+ *
+ * @param topic The topic of the record to acknowledge
+ * @param partition The partition of the record
+ * @param offset The offset of the record
+ * @param type The acknowledgment type which indicates whether it was
processed successfully
+ */
+ public void acknowledge(final String topic, final int partition, final
long offset, final AcknowledgeType type) {
+ for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch :
batches.entrySet()) {
+ TopicIdPartition tip = tipBatch.getKey();
+ KafkaException shareException = tipBatch.getValue().getException();
+ if (tip.topic().equals(topic) && (tip.partition() == partition) &&
+ shareException instanceof RecordDeserializationException &&
Review Comment:
I don't like the `instanceof` and then the cast. One way around this I
suppose would be to `ShareInFlightBatch` have an exception which has a known
type, `ShareFetchException` I guess. Then `ShareFetchException` could have
methods which extract the topic, partition and offset from the
`RecordDeserialization` exception, and then you just need
`shareException.offset() == offset)`. wdyt?
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
}
}
+ @ClusterTest
+ public void testExplicitOverrideAcknowledgeCorruptedMessage() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT),
+ null,
+ mockErrorDeserializer(3))) {
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record3 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.send(record2);
+ producer.send(record3);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofSeconds(60));
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
records.iterator();
+
+ ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+ ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+ assertEquals(0L, firstRecord.offset());
+ assertEquals(1L, secondRecord.offset());
+ shareConsumer.acknowledge(firstRecord);
+ shareConsumer.acknowledge(secondRecord);
+
+ RecordDeserializationException rde =
assertThrows(RecordDeserializationException.class, () ->
shareConsumer.poll(Duration.ofSeconds(60)));
+ assertEquals(2, rde.offset());
+ shareConsumer.commitSync();
+
+ // The corrupted record was automatically released, so we can
still obtain it.
+ rde = assertThrows(RecordDeserializationException.class, () ->
shareConsumer.poll(Duration.ofSeconds(60)));
+ assertEquals(2, rde.offset());
+
+ // Reject this record
+ shareConsumer.acknowledge(rde.topicPartition().topic(),
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+ shareConsumer.commitSync();
+
+ records = shareConsumer.poll(Duration.ZERO);
+ assertEquals(0, records.count());
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+ }
+
+ @ClusterTest
+ public void testExplicitAcknowledgeOffsetThrowsNotException() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))) {
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofSeconds(60));
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> consumedRecord =
records.records(tp).get(0);
+ assertEquals(0L, consumedRecord.offset());
+
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(),
AcknowledgeType.ACCEPT));
+
+ shareConsumer.acknowledge(consumedRecord);
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+ }
+
+ @ClusterTest
+ public void testExplicitAcknowledgeOffsetThrowsParametersError() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT),
+ null,
+ mockErrorDeserializer(2))) {
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.send(record2);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofSeconds(60));
+ assertEquals(1, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> iterator =
records.iterator();
+
+ ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+ assertEquals(0L, firstRecord.offset());
+ shareConsumer.acknowledge(firstRecord);
+
+ final RecordDeserializationException rde =
assertThrows(RecordDeserializationException.class, () ->
shareConsumer.poll(Duration.ofSeconds(60)));
+ assertEquals(1, rde.offset());
+
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge("foo", rde.topicPartition().partition(),
rde.offset(), AcknowledgeType.REJECT));
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(),
AcknowledgeType.REJECT));
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0,
AcknowledgeType.REJECT));
+
+ // Reject this record
+ shareConsumer.acknowledge(rde.topicPartition().topic(),
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+ shareConsumer.commitSync();
Review Comment:
And what happens if you repeat the acknowledge call after the commit?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchException.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.SerializationException;
+
+public class ShareFetchException extends SerializationException {
+
+ private final KafkaException origin;
+
+ private final ShareFetch<?, ?> shareFetch;
+
+ public ShareFetchException(KafkaException exception, ShareFetch<?, ?>
shareFetch) {
Review Comment:
I suggest `ShareFetchException(ShareFetch<?, ?> shareFetch, KafkaException
cause)`. If anything, the origin seems to be the `ShareFetch` to me.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java:
##########
@@ -110,7 +112,7 @@ public boolean isEmpty() {
* Acknowledge a single record in the current batch.
*
* @param record The record to acknowledge
- * @param type The acknowledge type which indicates whether it was
processed successfully
+ * @param type The acknowledgment type which indicates whether it was
processed successfully
*/
public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType
type) {
Review Comment:
Let's make this `type` parameter final too.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -507,6 +507,30 @@ public void acknowledge(ConsumerRecord<K, V> record,
AcknowledgeType type) {
delegate.acknowledge(record, type);
}
+ /**
+ * Acknowledge delivery of a specific record by its topic, partition, and
offset, indicating whether
+ * it was processed successfully. The acknowledgement is committed on the
next {@link #commitSync()},
+ * {@link #commitAsync()} or {@link #poll(Duration)} call.
+ * <p>
+ * This method provides an alternative to {@link
#acknowledge(ConsumerRecord, AcknowledgeType)} when
+ * the full record is unavailable. It is typically used for manual offset
management scenarios.
+ * <p>
+ * This method can only be used if the consumer is using <b>explicit
acknowledgement</b>.
+ *
+ * @param topic The topic of the record to acknowledge
+ * @param partition The partition of the record to acknowledge
+ * @param offset The offset of the record to acknowledge
+ * @param type The acknowledgement type which indicates whether it
was processed successfully
+ *
+ * @throws IllegalStateException if the specified record is not pending
acknowledgement,
Review Comment:
The KIP has a javadoc block which is a bit more accurate. Please take it
from the KIP. (The "manual offset management scenarios" bit is strange.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]