AndrewJSchofield commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2039155579
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -381,18 +381,17 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
/**
- * <code>share.acknowledgement.mode</code> is being evaluated as a new
configuration to control the acknowledgement mode
- * for share consumers. It will be removed or converted to a proper
configuration before release.
- * An alternative being considered is
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+ * <code>share.acknowledgement.mode</code> is a config to control the
acknowledgement mode
Review Comment:
Just `<code>share.acknowledgement.mode</code>` is sufficient. Because this
config is no longer internal, it will appear in the documentation for consumer
configs for free.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -381,18 +381,17 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
/**
- * <code>share.acknowledgement.mode</code> is being evaluated as a new
configuration to control the acknowledgement mode
- * for share consumers. It will be removed or converted to a proper
configuration before release.
- * An alternative being considered is
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+ * <code>share.acknowledgement.mode</code> is a config to control the
acknowledgement mode
+ * for share consumers. It can be set to implicit or explicit.
*/
- public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG =
"internal.share.acknowledgement.mode";
- private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC =
"Controls the acknowledgement mode for a share consumer." +
- " If unset, the acknowledgement mode of the consumer is decided by
the method calls it uses to fetch and commit." +
+ public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG =
"share.acknowledgement.mode";
+ private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the
acknowledgement mode for a share consumer." +
" If set to <code>implicit</code>, the acknowledgement mode of the
consumer is implicit and it must not" +
" use
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to
acknowledge delivery of records. Instead," +
" delivery is acknowledged implicitly on the next call to poll or
commit." +
" If set to <code>explicit</code>, the acknowledgement mode of the
consumer is explicit and it must use" +
- "
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to
acknowledge delivery of records.";
+ "
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to
acknowledge delivery of records." +
+ " If unset, the acknowledgement mode of the consumer is set to
'implicit' by default";
Review Comment:
Maybe the final sentence would be better as `Otherwise, the acknowledgement
mode is implicit.`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -695,12 +694,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
-
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
+
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
Type.STRING,
null,
in(null, "implicit", "explicit"),
Review Comment:
I think you should be following the style of `AutoOffsetResetStrategy` or
`GroupProtocol` for this configuration. In summary:
* define an enum/class called `ShareAcknowledgementMode` in
`o.a.k.clients.consumer.internals`
* validate using the names of the enum constants, uppercasing and then
matching so the application can use `Explicit` or `eXplicit` or whatever
* Do not permit a null value. The default would be
`ShareAcknowledgementMode.IMPLICIT.name().toLowerCase(Locale.ROOT)`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
* as naturally happens when the locks time out. This limit is controlled by
the broker configuration property
* {@code group.share.record.lock.partition.limit}. By limiting the duration
of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the
presence of consumer failures.
+ *
* <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit
acknowledgement</em>. In this case:
* <ul>
+ * <li>The application is expected to acknowledge all the records it
received in the batch before the next call to ({@link #poll(Duration)}</li>
+ * <li> If the application has some unacknowledged records before the next
call to ({@link #poll(Duration)}, then the poll()
Review Comment:
nit: Extra space after `<li>`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
* as naturally happens when the locks time out. This limit is controlled by
the broker configuration property
* {@code group.share.record.lock.partition.limit}. By limiting the duration
of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the
presence of consumer failures.
+ *
Review Comment:
nit: The style of the rest of this javadoc comment is not to have blank
lines before the `<p>` and to have the `<p>` on a line by itself.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
* as naturally happens when the locks time out. This limit is controlled by
the broker configuration property
* {@code group.share.record.lock.partition.limit}. By limiting the duration
of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the
presence of consumer failures.
+ *
* <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit
acknowledgement</em>. In this case:
* <ul>
+ * <li>The application is expected to acknowledge all the records it
received in the batch before the next call to ({@link #poll(Duration)}</li>
+ * <li> If the application has some unacknowledged records before the next
call to ({@link #poll(Duration)}, then the poll()
+ * throws an {@link IllegalStateException}. This is because in explicit
mode, all the records in the batch should be acknowledged before the next call
to poll()</li>
* <li>The application calls {@link #commitSync()} or {@link
#commitAsync()} which commits the acknowledgements to Kafka.
- * If any records in the batch were not acknowledged, they remain acquired
and will be presented to the application
- * in response to a future poll.</li>
+ * If any records in the batch were not acknowledged until the next
poll(), an {@link IllegalStateException} is thrown.</li>
* <li>The application calls {@link #poll(Duration)} without committing
first, which commits the acknowledgements to
* Kafka asynchronously. In this case, no exception is thrown by a failure
to commit the acknowledgement.
- * If any records in the batch were not acknowledged, they remain acquired
and will be presented to the application
- * in response to a future poll.</li>
+ * If any records in the batch were not acknowledged, an {@link
IllegalStateException} is thrown.</li>
* <li>The application calls {@link #close()} which attempts to commit any
pending acknowledgements and
* releases any remaining acquired records.</li>
* </ul>
- * If the application does not call {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>implicit acknowledgement</em>. In this case:
+ * If the application sets the {@code share.acknowledgement.mode} property to
"implicit" or does not configure the mode, then
Review Comment:
Maybe `does not set the {@code share.acknowledgement.mode} property to
"explicit", then`....
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -748,28 +713,33 @@ public void
testExplicitAcknowledgementCommitAsyncPartialBatch() {
// Acknowledging 2 out of the 3 records received via commitAsync.
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+ ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer1.acknowledge(firstRecord);
shareConsumer1.acknowledge(secondRecord);
shareConsumer1.commitAsync();
- // The 3rd record should be re-presented to the consumer when it
polls again.
- records = shareConsumer1.poll(Duration.ofMillis(5000));
- assertEquals(1, records.count());
- iterator = records.iterator();
- firstRecord = iterator.next();
- assertEquals(2L, firstRecord.offset());
+ producer.send(record4);
+ producer.flush();
+
+ // The next poll() should throw an IllegalStateException as there
is still 1 unacknowledged record.
+ // In EXPLICIT acknowledgement mode, we are not allowed to have
unacknowledged records from a batch.
+ assertThrows(IllegalStateException.class, () ->
shareConsumer1.poll(Duration.ofMillis(5000)));
+
+ // Acknowledging the 3rd record
Review Comment:
Hmm. Interesting. So, can I still use the `records` collection from the last
successful `poll(Duration)`? What if I reset the iterator and start running
through it again?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -156,6 +157,7 @@
* This example demonstrates implicit acknowledgement using {@link
#poll(Duration)} to acknowledge the records which
* were delivered in the previous poll. All the records delivered are
implicitly marked as successfully consumed and
* acknowledged synchronously with Kafka as the consumer fetches more records.
+ * The <code>share.acknowledgement.mode</code> property is not configured, so
it is set to "implicit" by default.
Review Comment:
I'd omit this line.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -140,9 +143,7 @@
* thrown by a failure to commit the acknowledgements.</li>
* <li>The application calls {@link #close()} which releases any acquired
records without acknowledgement.</li>
* </ul>
- * <p>The consumer can optionally use the {@code
internal.share.acknowledgement.mode} configuration property to choose
- * between implicit and explicit acknowledgement, specifying
<code>"implicit"</code> or <code>"explicit"</code> as required.
- * <p>
+ *
Review Comment:
I think you should have a `<p>` and no blank line.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
* as naturally happens when the locks time out. This limit is controlled by
the broker configuration property
* {@code group.share.record.lock.partition.limit}. By limiting the duration
of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the
presence of consumer failures.
+ *
* <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit
acknowledgement</em>. In this case:
* <ul>
+ * <li>The application is expected to acknowledge all the records it
received in the batch before the next call to ({@link #poll(Duration)}</li>
+ * <li> If the application has some unacknowledged records before the next
call to ({@link #poll(Duration)}, then the poll()
Review Comment:
Actually, I would remove this sentence. We can say it more clearly with
fewer words.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -588,7 +588,7 @@ public void testControlRecordsSkipped() throws Exception {
public void testExplicitAcknowledgeSuccess() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) {
Review Comment:
I'd like to see constants for "explicit" and "implicit" for tests, apart
from situations where invalid values are being tested intentionally.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -181,6 +183,7 @@
* props.setProperty("group.id", "test");
* props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
* props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("share.acknowledgement.mode",
"implicit");
Review Comment:
And this one.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java:
##########
@@ -115,23 +115,26 @@
* as naturally happens when the locks time out. This limit is controlled by
the broker configuration property
* {@code group.share.record.lock.partition.limit}. By limiting the duration
of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the
presence of consumer failures.
+ *
* <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes by configuring the
+ * {@link ConsumerConfig#SHARE_ACKNOWLEDGEMENT_MODE_CONFIG} property. If the
property is not set, the default mode is <code>"implicit"</code>.
+ *
+ * <p>If the config is set to "explicit", the consumer is using <em>explicit
acknowledgement</em>. In this case:
* <ul>
+ * <li>The application is expected to acknowledge all the records it
received in the batch before the next call to ({@link #poll(Duration)}</li>
Review Comment:
Let's make it stronger. `The application must acknowledge all the records it
received in the batch before`... Also, the line lengths are intentionally quite
short here for readability so please split this line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]