AndrewJSchofield commented on code in PR #17035:
URL: https://github.com/apache/kafka/pull/17035#discussion_r1746361177


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchEvent.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
+
+/**
+ * {@code FetchEvent} is sent from the consumer to signal that we want to 
issue a fetch request for the partitions
+ * to which the consumer is currently subscribed.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this event is completed when the {@link FetchRequestManager} 
has finished performing the
+ * fetch request process. It does not mean that the requests are complete. It 
could be the case that no fetch
+ * requests were created. Also of note is that if any fetch requests were 
created.

Review Comment:
   I don't quite understand the sentence "Also of note is that if any fetch 
requests were created.".  I think I understand what the PR does - this event 
completes when the background thread has created any FetchRequests to be sent 
by the network client delegate. There might be none to send and the event 
completes in this case too.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -65,16 +67,59 @@ protected void maybeThrowAuthFailure(Node node) {
         networkClientDelegate.maybeThrowAuthFailure(node);
     }
 
+    /**
+     * Request that a fetch request be issued to the cluster to pull down the 
next batch of records.
+     *
+     * <p/>
+     *
+     * The returned {@link CompletableFuture} is {@link 
CompletableFuture#complete(Object) completed} when the
+     * fetch request(s) have been created and enqueued into the network 
client's outgoing send buffer.
+     * It is <em>not completed</em> when the network client has received the 
data.
+     *
+     * @return Future for which the caller can wait to ensure that the 
requests have been enqueued
+     */
+    public CompletableFuture<Void> requestFetch() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        if (pendingFetchRequestFuture != null) {
+            // In this case, we have an outstanding fetch request, so chain 
the newly created future to be
+            // invoked when the outstanding fetch request is completed.
+            pendingFetchRequestFuture.whenComplete((value, exception) -> {
+                if (exception != null) {

Review Comment:
   Given that you only complete the pending event successfully in the finally 
block of `poll()`, you could probably just `future.complete(null)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to