lianetm commented on code in PR #17035:
URL: https://github.com/apache/kafka/pull/17035#discussion_r1757119740


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -65,16 +67,59 @@ protected void maybeThrowAuthFailure(Node node) {
         networkClientDelegate.maybeThrowAuthFailure(node);
     }
 
+    /**
+     * Request that a fetch request be issued to the cluster to pull down the 
next batch of records.
+     *
+     * <p/>
+     *
+     * The returned {@link CompletableFuture} is {@link 
CompletableFuture#complete(Object) completed} when the
+     * fetch request(s) have been created and enqueued into the network 
client's outgoing send buffer.
+     * It is <em>not completed</em> when the network client has received the 
data.
+     *
+     * @return Future for which the caller can wait to ensure that the 
requests have been enqueued
+     */
+    public CompletableFuture<Void> requestFetch() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        if (pendingFetchRequestFuture != null) {
+            // In this case, we have an outstanding fetch request, so chain 
the newly created future to be
+            // completed when the outstanding fetch request is completed.
+            pendingFetchRequestFuture.whenComplete((value, exception) -> {
+                if (exception != null) {
+                    future.completeExceptionally(exception);
+                } else {
+                    future.complete(value);
+                }
+            });
+        } else {
+            pendingFetchRequestFuture = future;
+        }
+
+        return future;
+    }
+
     /**
      * {@inheritDoc}
      */
     @Override
     public PollResult poll(long currentTimeMs) {
-        return pollInternal(
+        if (pendingFetchRequestFuture == null) {
+            // If no explicit request for fetching has been issued, just 
short-circuit.
+            return PollResult.EMPTY;
+        }
+
+        try {
+            return pollInternal(
                 prepareFetchRequests(),
                 this::handleFetchSuccess,
                 this::handleFetchFailure
-        );
+            );
+        } finally {
+            // Completing the future here means that the caller knows that the 
fetch request logic has been

Review Comment:
   "the fetch request logic has been performed" seems confusingly vague (could 
hint more that what we really do here I'm afraid). Maybe just saying that the 
caller knows that the fetch request has been generated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to