lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1777058782


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -162,23 +163,44 @@ private void process(final PollEvent event) {
     }
 
     private void process(final AsyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
-            return;
-        }
-
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
-        future.whenComplete(complete(event.future()));
+        process((CommitEvent) event);

Review Comment:
   We have 2 separate event types (async and sync), to then join them together 
in one here, to then split them again for the actual process with:
   ```
   if (event.type() == Type.COMMIT_ASYNC) {
                   future = manager.commitAsync(offsets);
               } else {
                   future = manager.commitSync(offsets, event.deadlineMs());
               }
   ```
   I get that with this we can reuse a bit but wonder if it's worth the twisted 
flow. Could we maybe keep them separate (as they originally are when the events 
are created), then process(Sync) that ends up calling the mgr.commitSync, and 
process(Async) calling manager.commitAsync, and just encapsulate in funcs what 
we want to reuse in both? (ex. maybeUpdateLastSeenEpochIfNewer() with lines 
188-191 that would be called from both, similar for the logic to retrieve 
offsets from the event, ln 180-181). What do you think?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
         assertInstanceOf(IllegalStateException.class, e.getCause());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSyncCommitEventWithOffsets(boolean withGroupId) {

Review Comment:
   since this is testing commit it does need a group id, so it should be only 
for withGroupId=true I expect



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
         assertInstanceOf(IllegalStateException.class, e.getCause());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSyncCommitEventWithOffsets(boolean withGroupId) {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(offsets, false, 
deadlineMs);
+
+        setupProcessor(withGroupId);
+        if (withGroupId) {
+            doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+            
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+        }
+
+        processor.process(event);
+        verify(subscriptionState, never()).allConsumed();
+        if (withGroupId) {
+            verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+            verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        } else {
+            verify(metadata, never()).updateLastSeenEpochIfNewer(tp, 1);
+            verify(commitRequestManager, never()).commitSync(offsets, 
deadlineMs);
+        }
+        assertDoesNotThrow(() -> event.future().get());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSyncCommitEventWithCommitAllConsumed(boolean withGroupId) {

Review Comment:
   same, only relevant for withGroupId=true right? (and the all the other 
commit tests down below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to