AndrewJSchofield commented on code in PR #20148:
URL: https://github.com/apache/kafka/pull/20148#discussion_r2217238430
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java:
##########
@@ -123,6 +125,29 @@ public void acknowledge(final ConsumerRecord<K, V> record,
AcknowledgeType type)
throw new IllegalStateException("The record cannot be acknowledged.");
}
+ /**
+ * Acknowledge a single record by its topic, partition and offset in the
current batch.
+ *
+ * @param topic The topic of the record to acknowledge
+ * @param partition The partition of the record
+ * @param offset The offset of the record
+ * @param type The acknowledgment type which indicates whether it was
processed successfully
+ */
+ public void acknowledge(final String topic, final int partition, final
long offset, final AcknowledgeType type) {
+ for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch :
batches.entrySet()) {
+ TopicIdPartition tip = tipBatch.getKey();
+ KafkaException shareException = tipBatch.getValue().getException();
+ if (tip.topic().equals(topic) && (tip.partition() == partition) &&
+ shareException instanceof RecordDeserializationException &&
Review Comment:
Yes, I was thinking about this case last night and you're absolutely
correct. But still, `ShareFetchException` could perhaps be able to return an
offset if appropriate rather than using `instanceof` and casting. There's got
to be a more elegant, OO way of achieving this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]