AndrewJSchofield commented on code in PR #16461: URL: https://github.com/apache/kafka/pull/16461#discussion_r1673903246
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -0,0 +1,888 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.message.ShareAcknowledgeRequestData; +import org.apache.kafka.common.message.ShareFetchRequestData; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ShareAcknowledgeRequest; +import org.apache.kafka.common.requests.ShareAcknowledgeResponse; +import org.apache.kafka.common.requests.ShareFetchRequest; +import org.apache.kafka.common.requests.ShareFetchResponse; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; + +import org.slf4j.Logger; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +/** + * {@code ShareConsumeRequestManager} is responsible for generating {@link ShareFetchRequest} and + * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being delivered for a consumer + * in a share group. + */ +public class ShareConsumeRequestManager implements RequestManager, MemberStateListener, Closeable { + private final Time time; + private final Logger log; + private final LogContext logContext; + private final String groupId; + private final ConsumerMetadata metadata; + private final SubscriptionState subscriptions; + private final FetchConfig fetchConfig; + protected final ShareFetchBuffer shareFetchBuffer; + private final BackgroundEventHandler backgroundEventHandler; + private final Map<Integer, ShareSessionHandler> sessionHandlers; + private final Set<Integer> nodesWithPendingRequests; + private final ShareFetchMetricsManager metricsManager; + private final IdempotentCloser idempotentCloser = new IdempotentCloser(); + private Uuid memberId; + private boolean fetchMoreRecords = false; + private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap; + private final Queue<AcknowledgeRequestState> acknowledgeRequestStates; + private final long retryBackoffMs; + private final long retryBackoffMaxMs; + private boolean closing = false; + + ShareConsumeRequestManager(final Time time, + final LogContext logContext, + final String groupId, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final ShareFetchBuffer shareFetchBuffer, + final BackgroundEventHandler backgroundEventHandler, + final ShareFetchMetricsManager metricsManager, + final long retryBackoffMs, + final long retryBackoffMaxMs) { + this.time = time; + this.log = logContext.logger(ShareConsumeRequestManager.class); + this.logContext = logContext; + this.groupId = groupId; + this.metadata = metadata; + this.subscriptions = subscriptions; + this.fetchConfig = fetchConfig; + this.shareFetchBuffer = shareFetchBuffer; + this.backgroundEventHandler = backgroundEventHandler; + this.metricsManager = metricsManager; + this.retryBackoffMs = retryBackoffMs; + this.retryBackoffMaxMs = retryBackoffMaxMs; + this.sessionHandlers = new HashMap<>(); + this.nodesWithPendingRequests = new HashSet<>(); + this.acknowledgeRequestStates = new LinkedList<>(); + this.fetchAcknowledgementsMap = new HashMap<>(); + } + + @Override + public PollResult poll(long currentTimeMs) { + if (memberId == null) { + return PollResult.EMPTY; + } + + // Send any pending acknowledgements before fetching more records. + PollResult pollResult = processAcknowledgements(currentTimeMs); + if (pollResult != null) { + return pollResult; + } + + if (!fetchMoreRecords) { + return PollResult.EMPTY; + } + + Map<Node, ShareSessionHandler> handlerMap = new HashMap<>(); + Map<String, Uuid> topicIds = metadata.topicIds(); + for (TopicPartition partition : partitionsToFetch()) { + Optional<Node> leaderOpt = metadata.currentLeader(partition).leader; + + if (!leaderOpt.isPresent()) { + log.debug("Requesting metadata update for partition {} since current leader node is missing", partition); + metadata.requestUpdate(false); + continue; + } + + Uuid topicId = topicIds.get(partition.topic()); + if (topicId == null) { + log.debug("Requesting metadata update for partition {} since topic ID is missing", partition); + metadata.requestUpdate(false); + continue; + } + + Node node = leaderOpt.get(); + if (nodesWithPendingRequests.contains(node.id())) { + log.trace("Skipping fetch for partition {} because previous fetch request to {} has not been processed", partition, node.id()); + } else { + // if there is a leader and no in-flight requests, issue a new fetch + ShareSessionHandler handler = handlerMap.computeIfAbsent(node, k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); + + TopicIdPartition tip = new TopicIdPartition(topicId, partition); + Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip); + if (acknowledgementsToSend != null) { + metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + } + handler.addPartitionToFetch(tip, acknowledgementsToSend); + + log.debug("Added fetch request for partition {} to node {}", partition, node); + } + } + + Map<Node, ShareFetchRequest.Builder> builderMap = new LinkedHashMap<>(); + for (Map.Entry<Node, ShareSessionHandler> entry : handlerMap.entrySet()) { + builderMap.put(entry.getKey(), entry.getValue().newShareFetchBuilder(groupId, fetchConfig)); + } + + List<UnsentRequest> requests = builderMap.entrySet().stream().map(entry -> { + Node target = entry.getKey(); + ShareFetchRequest.Builder requestBuilder = entry.getValue(); + + nodesWithPendingRequests.add(target.id()); + + BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> { + if (error != null) { + handleShareFetchFailure(target, requestBuilder.data(), error); + } else { + handleShareFetchSuccess(target, requestBuilder.data(), clientResponse); + } + }; + return new UnsentRequest(requestBuilder, Optional.of(target)).whenComplete(responseHandler); + }).collect(Collectors.toList()); + + return new PollResult(requests); + } + + public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) { + if (!fetchMoreRecords) { + log.debug("Fetch more data"); + fetchMoreRecords = true; + } + acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge)); + } + + /** + * Process acknowledgeRequestStates and prepares a list of acknowledgements to be sent in the poll(). + * + * @param currentTimeMs the current time in ms. + * + * @return the PollResult containing zero or more acknowledgements. + */ + private PollResult processAcknowledgements(long currentTimeMs) { + List<UnsentRequest> unsentRequests = new ArrayList<>(); + Iterator<AcknowledgeRequestState> iterator = acknowledgeRequestStates.iterator(); + while (iterator.hasNext()) { + AcknowledgeRequestState acknowledgeRequestState = iterator.next(); + if (acknowledgeRequestState.isProcessed()) { + iterator.remove(); + } else if (!acknowledgeRequestState.maybeExpire()) { + if (nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) { + log.trace("Skipping acknowledge request because previous request to {} has not been processed", acknowledgeRequestState.nodeId); + } else { + if (acknowledgeRequestState.canSendRequest(currentTimeMs)) { + acknowledgeRequestState.onSendAttempt(currentTimeMs); + UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); + if (request != null) { + unsentRequests.add(request); + } + } + } + } else { + // Fill in TimeoutException + for (TopicIdPartition tip : acknowledgeRequestState.acknowledgementsMap.keySet()) { + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.REQUEST_TIMED_OUT); + } + iterator.remove(); + } + } + + PollResult pollResult = null; + if (!unsentRequests.isEmpty()) { + pollResult = new PollResult(unsentRequests); + } else if (!acknowledgeRequestStates.isEmpty()) { + // Return empty result until all the acknowledgement request states are processed + pollResult = PollResult.EMPTY; + } + return pollResult; + } + + /** + * Enqueue an AcknowledgeRequestState to be picked up on the next poll + * + * @param acknowledgementsMap The acknowledgements to commit + * @param deadlineMs Time until which the request will be retried if it fails with + * an expected retriable error. + * + * @return The future which completes when the acknowledgements finished + */ + public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync( + final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, + final long deadlineMs) { + final AtomicInteger resultCount = new AtomicInteger(); + final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>(); + final CommitResultHandler resultHandler = new CommitResultHandler(resultCount, Optional.of(future)); + + final Cluster cluster = metadata.fetch(); + + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Node node = cluster.nodeById(nodeId); + if (node != null) { + Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>(); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = acknowledgementsMap.get(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); + + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + resultCount.incrementAndGet(); + } + } + acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":1", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + this::handleShareAcknowledgeSuccess, + this::handleShareAcknowledgeFailure, + resultHandler + )); + } + }); + + resultHandler.completeIfEmpty(); + return future; + } + + /** + * Enqueue an AcknowledgeRequestState to be picked up on the next poll. + * + * @param acknowledgementsMap The acknowledgements to commit + */ + public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) { + final Cluster cluster = metadata.fetch(); + final AtomicInteger resultCount = new AtomicInteger(); + final CommitResultHandler resultHandler = new CommitResultHandler(resultCount, Optional.empty()); + + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Node node = cluster.nodeById(nodeId); + if (node != null) { + Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>(); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = acknowledgementsMap.get(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); + + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + resultCount.incrementAndGet(); + } + } + acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":2", + Long.MAX_VALUE, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + this::handleShareAcknowledgeSuccess, + this::handleShareAcknowledgeFailure, + resultHandler + )); + } + }); + + resultHandler.completeIfEmpty(); + } + + /** + * Enqueue the final AcknowledgeRequestState used to commit the final acknowledgements and + * close the share sessions. + * + * @param acknowledgementsMap The acknowledgements to commit + * @param deadlineMs Time until which the request will be retried if it fails with + * an expected retriable error. + * + * @return The future which completes when the acknowledgements finished + */ + public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, + final long deadlineMs) { + final Cluster cluster = metadata.fetch(); + final AtomicInteger resultCount = new AtomicInteger(); + final CompletableFuture<Void> future = new CompletableFuture<>(); + final CloseResultHandler resultHandler = new CloseResultHandler(resultCount, future); + + closing = true; + + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Node node = cluster.nodeById(nodeId); + if (node != null) { + Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>(); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = acknowledgementsMap.get(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); + + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node); + resultCount.incrementAndGet(); + } + } + acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":3", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + this::handleShareAcknowledgeCloseSuccess, + this::handleShareAcknowledgeCloseFailure, + resultHandler, + true + )); + } + }); + + resultHandler.completeIfEmpty(); + return future; + } + + private void handleShareFetchSuccess(Node fetchTarget, + ShareFetchRequestData requestData, Review Comment: Yes, for now. I've suppressed the warning since I expect to revisit the error handling here shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
