AndrewJSchofield commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2044426375
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -140,9 +129,17 @@
* thrown by a failure to commit the acknowledgements.</li>
* <li>The application calls {@link #close()} which releases any acquired
records without acknowledgement.</li>
* </ul>
- * <p>The consumer can optionally use the {@code
internal.share.acknowledgement.mode} configuration property to choose
- * between implicit and explicit acknowledgement, specifying
<code>"implicit"</code> or <code>"explicit"</code> as required.
- * <p>
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit
acknowledgement</em>. In this case:
+ * <ul>
+ * <li>The application must acknowledge all the records it received in the
batch before the next call to {@link #poll(Duration)}</li>
+ * <li>The application calls {@link #commitSync()} or {@link
#commitAsync()} which commits the acknowledgements to Kafka.
+ * If any records in the batch were not acknowledged until the next
poll(), an {@link IllegalStateException} is thrown.</li>
+ * <li>The application calls {@link #poll(Duration)} without committing
first, which commits the acknowledgements to
+ * Kafka asynchronously. In this case, no exception is thrown by a failure
to commit the acknowledgement.
+ * If any records in the batch were not acknowledged, an {@link
IllegalStateException} is thrown.</li>
+ * <li>The application calls {@link #close()} which attempts to commit any
pending acknowledgements and
+ * releases any remaining acquired records.</li>
+ * </ul>
Review Comment:
Let's have another try at this.
```
* <p>
* The consumer can choose to use implicit or explicit acknowledgement of
the records it processes by configuring the
* consumer {@code share.acknowledgement.mode} property.
* <p>
* If the application sets the property to "implicit" or does not set it at
all, then the consumer is using
* <em>implicit acknowledgement</em>. In this mode, the application
acknowledges delivery by:
* <ul>
* <li>Calling {@link #poll(Duration)} without committing, which also
implicitly acknowledges all of
* the delivered records and commits the acknowledgements to Kafka
asynchronously. In this case, no exception is
* thrown by a failure to commit the acknowledgements.</li>
* <li>Calling {@link #commitSync()} or {@link #commitAsync()} which
implicitly acknowledges all of
* the delivered records as processed successfully and commits the
acknowledgements to Kafka.</li>
* <li>Calling {@link #close()} which releases any acquired records
without acknowledgement.</li>
* </ul>
* If the application sets the property to "explicit", then the consumer is
using <em>explicit acknowledgment</em>.
* The application must acknowledge all records returned from {@link
#poll(Duration)} using
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next
call to {@link #poll(Duration)}.
* If the application calls {@link #poll(Duration)} without having
acknowledged all records, an
* {@link IllegalStateException} is thrown. The remaining unacknowledged
records can still be acknowledged.
* In this mode, the application acknowledges delivery by:
* <ul>
* <li>Calling {@link #poll(Duration)} after it has acknowledged all
records, which commits the acknowledgements
* to Kafka asynchronously. In this case, no exception is thrown by a
failure to commit the acknowledgements.</li>
* <li>Calling {@link #commitSync()} or {@link #commitAsync()} which
commits any pending
* acknowledgements to Kafka.</li>
* <li>Calling {@link #close()} which attempts to commit any pending
acknowledgements and releases
* any remaining acquired records.</li>
* </ul>
* The consumer guarantees that the records returned in the {@code
ConsumerRecords} object for a specific topic-partition
* are in order of increasing offset. For each topic-partition, Kafka
guarantees that acknowledgements for the records
* in a batch are performed atomically. This makes error handling
significantly more straightforward because there can be
* one error code per partition.
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -673,6 +661,10 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> ack
// Notify the network thread to wake up and start the next
round of fetching
applicationEventHandler.wakeupNetworkThread();
}
+ if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
+ // We cannot leave unacknowledged records in EXPLICIT
acknowledgement mode, so we throw an exception to the application.
+ throw new IllegalStateException("There are unacknowledged
records from the previous fetch : " + currentFetch.records());
Review Comment:
It would be much better if the entire set of records was not appended. I
suggest "All records must be acknowledged in explicit acknowledgement mode".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]