AndrewJSchofield commented on code in PR #16461:
URL: https://github.com/apache/kafka/pull/16461#discussion_r1673903246


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -0,0 +1,888 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
+import org.apache.kafka.common.message.ShareFetchRequestData;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ShareAcknowledgeRequest;
+import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/**
+ * {@code ShareConsumeRequestManager} is responsible for generating {@link 
ShareFetchRequest} and
+ * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being 
delivered for a consumer
+ * in a share group.
+ */
+public class ShareConsumeRequestManager implements RequestManager, 
MemberStateListener, Closeable {
+    private final Time time;
+    private final Logger log;
+    private final LogContext logContext;
+    private final String groupId;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final FetchConfig fetchConfig;
+    protected final ShareFetchBuffer shareFetchBuffer;
+    private final BackgroundEventHandler backgroundEventHandler;
+    private final Map<Integer, ShareSessionHandler> sessionHandlers;
+    private final Set<Integer> nodesWithPendingRequests;
+    private final ShareFetchMetricsManager metricsManager;
+    private final IdempotentCloser idempotentCloser = new IdempotentCloser();
+    private Uuid memberId;
+    private boolean fetchMoreRecords = false;
+    private final Map<TopicIdPartition, Acknowledgements> 
fetchAcknowledgementsMap;
+    private final Queue<AcknowledgeRequestState> acknowledgeRequestStates;
+    private final long retryBackoffMs;
+    private final long retryBackoffMaxMs;
+    private boolean closing = false;
+
+    ShareConsumeRequestManager(final Time time,
+                               final LogContext logContext,
+                               final String groupId,
+                               final ConsumerMetadata metadata,
+                               final SubscriptionState subscriptions,
+                               final FetchConfig fetchConfig,
+                               final ShareFetchBuffer shareFetchBuffer,
+                               final BackgroundEventHandler 
backgroundEventHandler,
+                               final ShareFetchMetricsManager metricsManager,
+                               final long retryBackoffMs,
+                               final long retryBackoffMaxMs) {
+        this.time = time;
+        this.log = logContext.logger(ShareConsumeRequestManager.class);
+        this.logContext = logContext;
+        this.groupId = groupId;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.fetchConfig = fetchConfig;
+        this.shareFetchBuffer = shareFetchBuffer;
+        this.backgroundEventHandler = backgroundEventHandler;
+        this.metricsManager = metricsManager;
+        this.retryBackoffMs = retryBackoffMs;
+        this.retryBackoffMaxMs = retryBackoffMaxMs;
+        this.sessionHandlers = new HashMap<>();
+        this.nodesWithPendingRequests = new HashSet<>();
+        this.acknowledgeRequestStates = new LinkedList<>();
+        this.fetchAcknowledgementsMap = new HashMap<>();
+    }
+
+    @Override
+    public PollResult poll(long currentTimeMs) {
+        if (memberId == null) {
+            return PollResult.EMPTY;
+        }
+
+        // Send any pending acknowledgements before fetching more records.
+        PollResult pollResult = processAcknowledgements(currentTimeMs);
+        if (pollResult != null) {
+            return pollResult;
+        }
+
+        if (!fetchMoreRecords) {
+            return PollResult.EMPTY;
+        }
+
+        Map<Node, ShareSessionHandler> handlerMap = new HashMap<>();
+        Map<String, Uuid> topicIds = metadata.topicIds();
+        for (TopicPartition partition : partitionsToFetch()) {
+            Optional<Node> leaderOpt = 
metadata.currentLeader(partition).leader;
+
+            if (!leaderOpt.isPresent()) {
+                log.debug("Requesting metadata update for partition {} since 
current leader node is missing", partition);
+                metadata.requestUpdate(false);
+                continue;
+            }
+
+            Uuid topicId = topicIds.get(partition.topic());
+            if (topicId == null) {
+                log.debug("Requesting metadata update for partition {} since 
topic ID is missing", partition);
+                metadata.requestUpdate(false);
+                continue;
+            }
+
+            Node node = leaderOpt.get();
+            if (nodesWithPendingRequests.contains(node.id())) {
+                log.trace("Skipping fetch for partition {} because previous 
fetch request to {} has not been processed", partition, node.id());
+            } else {
+                // if there is a leader and no in-flight requests, issue a new 
fetch
+                ShareSessionHandler handler = handlerMap.computeIfAbsent(node, 
k -> sessionHandlers.computeIfAbsent(node.id(), n -> new 
ShareSessionHandler(logContext, n, memberId)));
+
+                TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
+                Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsMap.get(tip);
+                if (acknowledgementsToSend != null) {
+                    
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                }
+                handler.addPartitionToFetch(tip, acknowledgementsToSend);
+
+                log.debug("Added fetch request for partition {} to node {}", 
partition, node);
+            }
+        }
+
+        Map<Node, ShareFetchRequest.Builder> builderMap = new 
LinkedHashMap<>();
+        for (Map.Entry<Node, ShareSessionHandler> entry : 
handlerMap.entrySet()) {
+            builderMap.put(entry.getKey(), 
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+        }
+
+        List<UnsentRequest> requests = 
builderMap.entrySet().stream().map(entry -> {
+            Node target = entry.getKey();
+            ShareFetchRequest.Builder requestBuilder = entry.getValue();
+
+            nodesWithPendingRequests.add(target.id());
+
+            BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
+                if (error != null) {
+                    handleShareFetchFailure(target, requestBuilder.data(), 
error);
+                } else {
+                    handleShareFetchSuccess(target, requestBuilder.data(), 
clientResponse);
+                }
+            };
+            return new UnsentRequest(requestBuilder, 
Optional.of(target)).whenComplete(responseHandler);
+        }).collect(Collectors.toList());
+
+        return new PollResult(requests);
+    }
+
+    public void fetch(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
+        if (!fetchMoreRecords) {
+            log.debug("Fetch more data");
+            fetchMoreRecords = true;
+        }
+        acknowledgementsMap.forEach((tip, acks) -> 
fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge));
+    }
+
+    /**
+     * Process acknowledgeRequestStates and prepares a list of 
acknowledgements to be sent in the poll().
+     *
+     * @param currentTimeMs the current time in ms.
+     *
+     * @return the PollResult containing zero or more acknowledgements.
+     */
+    private PollResult processAcknowledgements(long currentTimeMs) {
+        List<UnsentRequest> unsentRequests = new ArrayList<>();
+        Iterator<AcknowledgeRequestState> iterator = 
acknowledgeRequestStates.iterator();
+        while (iterator.hasNext()) {
+            AcknowledgeRequestState acknowledgeRequestState = iterator.next();
+            if (acknowledgeRequestState.isProcessed()) {
+                iterator.remove();
+            } else if (!acknowledgeRequestState.maybeExpire()) {
+                if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                    log.trace("Skipping acknowledge request because previous 
request to {} has not been processed", acknowledgeRequestState.nodeId);
+                } else {
+                    if (acknowledgeRequestState.canSendRequest(currentTimeMs)) 
{
+                        acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                        UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
+                        if (request != null) {
+                            unsentRequests.add(request);
+                        }
+                    }
+                }
+            } else {
+                // Fill in TimeoutException
+                for (TopicIdPartition tip : 
acknowledgeRequestState.acknowledgementsMap.keySet()) {
+                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.REQUEST_TIMED_OUT);
+                }
+                iterator.remove();
+            }
+        }
+
+        PollResult pollResult = null;
+        if (!unsentRequests.isEmpty()) {
+            pollResult = new PollResult(unsentRequests);
+        } else if (!acknowledgeRequestStates.isEmpty()) {
+            // Return empty result until all the acknowledgement request 
states are processed
+            pollResult = PollResult.EMPTY;
+        }
+        return pollResult;
+    }
+
+    /**
+     * Enqueue an AcknowledgeRequestState to be picked up on the next poll
+     *
+     * @param acknowledgementsMap The acknowledgements to commit
+     * @param deadlineMs          Time until which the request will be retried 
if it fails with
+     *                            an expected retriable error.
+     *
+     * @return The future which completes when the acknowledgements finished
+     */
+    public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> 
commitSync(
+            final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap,
+            final long deadlineMs) {
+        final AtomicInteger resultCount = new AtomicInteger();
+        final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> 
future = new CompletableFuture<>();
+        final CommitResultHandler resultHandler = new 
CommitResultHandler(resultCount, Optional.of(future));
+
+        final Cluster cluster = metadata.fetch();
+
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
+                for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                    Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
+                    if (acknowledgements != null) {
+                        acknowledgementsMapForNode.put(tip, acknowledgements);
+
+                        
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                        log.debug("Added acknowledge request for partition {} 
to node {}", tip.topicPartition(), node);
+                        resultCount.incrementAndGet();
+                    }
+                }
+                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
+                        ShareConsumeRequestManager.class.getSimpleName() + 
":1",
+                        deadlineMs,
+                        retryBackoffMs,
+                        retryBackoffMaxMs,
+                        sessionHandler,
+                        nodeId,
+                        acknowledgementsMapForNode,
+                        this::handleShareAcknowledgeSuccess,
+                        this::handleShareAcknowledgeFailure,
+                        resultHandler
+                ));
+            }
+        });
+
+        resultHandler.completeIfEmpty();
+        return future;
+    }
+
+    /**
+     * Enqueue an AcknowledgeRequestState to be picked up on the next poll.
+     *
+     * @param acknowledgementsMap The acknowledgements to commit
+     */
+    public void commitAsync(final Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
+        final Cluster cluster = metadata.fetch();
+        final AtomicInteger resultCount = new AtomicInteger();
+        final CommitResultHandler resultHandler = new 
CommitResultHandler(resultCount, Optional.empty());
+
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
+                for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                    Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
+                    if (acknowledgements != null) {
+                        acknowledgementsMapForNode.put(tip, acknowledgements);
+
+                        
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                        log.debug("Added acknowledge request for partition {} 
to node {}", tip.topicPartition(), node);
+                        resultCount.incrementAndGet();
+                    }
+                }
+                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
+                        ShareConsumeRequestManager.class.getSimpleName() + 
":2",
+                        Long.MAX_VALUE,
+                        retryBackoffMs,
+                        retryBackoffMaxMs,
+                        sessionHandler,
+                        nodeId,
+                        acknowledgementsMapForNode,
+                        this::handleShareAcknowledgeSuccess,
+                        this::handleShareAcknowledgeFailure,
+                        resultHandler
+                ));
+            }
+        });
+
+        resultHandler.completeIfEmpty();
+    }
+
+    /**
+     * Enqueue the final AcknowledgeRequestState used to commit the final 
acknowledgements and
+     * close the share sessions.
+     *
+     * @param acknowledgementsMap The acknowledgements to commit
+     * @param deadlineMs          Time until which the request will be retried 
if it fails with
+     *                            an expected retriable error.
+     *
+     * @return The future which completes when the acknowledgements finished
+     */
+    public CompletableFuture<Void> acknowledgeOnClose(final 
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap,
+                                                      final long deadlineMs) {
+        final Cluster cluster = metadata.fetch();
+        final AtomicInteger resultCount = new AtomicInteger();
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        final CloseResultHandler resultHandler = new 
CloseResultHandler(resultCount, future);
+
+        closing = true;
+
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
+                for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                    Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
+                    if (acknowledgements != null) {
+                        acknowledgementsMapForNode.put(tip, acknowledgements);
+
+                        
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                        log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(), node);
+                        resultCount.incrementAndGet();
+                    }
+                }
+                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
+                        ShareConsumeRequestManager.class.getSimpleName() + 
":3",
+                        deadlineMs,
+                        retryBackoffMs,
+                        retryBackoffMaxMs,
+                        sessionHandler,
+                        nodeId,
+                        acknowledgementsMapForNode,
+                        this::handleShareAcknowledgeCloseSuccess,
+                        this::handleShareAcknowledgeCloseFailure,
+                        resultHandler,
+                        true
+                ));
+            }
+        });
+
+        resultHandler.completeIfEmpty();
+        return future;
+    }
+
+    private void handleShareFetchSuccess(Node fetchTarget,
+                                         ShareFetchRequestData requestData,

Review Comment:
   Yes, for now. I've suppressed the warning since I expect to revisit the 
error handling here shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to