AndrewJSchofield commented on code in PR #17035: URL: https://github.com/apache/kafka/pull/17035#discussion_r1746361177
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchEvent.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.FetchRequestManager; + +/** + * {@code FetchEvent} is sent from the consumer to signal that we want to issue a fetch request for the partitions + * to which the consumer is currently subscribed. + * + * <p/> + * + * <em>Note</em>: this event is completed when the {@link FetchRequestManager} has finished performing the + * fetch request process. It does not mean that the requests are complete. It could be the case that no fetch + * requests were created. Also of note is that if any fetch requests were created. Review Comment: I don't quite understand the sentence "Also of note is that if any fetch requests were created.". I think I understand what the PR does - this event completes when the background thread has created any FetchRequests to be sent by the network client delegate. There might be none to send and the event completes in this case too. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ########## @@ -65,16 +67,59 @@ protected void maybeThrowAuthFailure(Node node) { networkClientDelegate.maybeThrowAuthFailure(node); } + /** + * Request that a fetch request be issued to the cluster to pull down the next batch of records. + * + * <p/> + * + * The returned {@link CompletableFuture} is {@link CompletableFuture#complete(Object) completed} when the + * fetch request(s) have been created and enqueued into the network client's outgoing send buffer. + * It is <em>not completed</em> when the network client has received the data. + * + * @return Future for which the caller can wait to ensure that the requests have been enqueued + */ + public CompletableFuture<Void> requestFetch() { + CompletableFuture<Void> future = new CompletableFuture<>(); + + if (pendingFetchRequestFuture != null) { + // In this case, we have an outstanding fetch request, so chain the newly created future to be + // invoked when the outstanding fetch request is completed. + pendingFetchRequestFuture.whenComplete((value, exception) -> { + if (exception != null) { Review Comment: Given that you only complete the pending event successfully in the finally block of `poll()`, you could probably just `future.complete(null)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
