smjn commented on code in PR #17270: URL: https://github.com/apache/kafka/pull/17270#discussion_r1793222832
########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -0,0 +1,863 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.server.util.InterBrokerSendThread; +import org.apache.kafka.server.util.RequestAndCompletionHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class encapsulates various handler classes corresponding to share + * state RPCs. It also holds an {@link InterBrokerSendThread} specialization + * which manages the sending the RPC requests over the network. + * This class is for the exclusive purpose of being used with {@link DefaultStatePersister} + * but can be extended for other {@link Persister} implementations as well. + */ +public class PersisterStateManager { + + private SendThread sender; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final ShareCoordinatorMetadataCacheHelper cacheHelper; + public static final long REQUEST_BACKOFF_MS = 1_000L; + public static final long REQUEST_BACKOFF_MAX_MS = 30_000L; + private static final int MAX_FIND_COORD_ATTEMPTS = 5; + private final Time time; + // holds the set of share coord nodes for each RPC type which is currently sent but not completed + private final Map<RPCType, Set<Node>> inFlight = new HashMap<>(); + + // Mapping for batchable RPCs. The top level grouping is based on destination share coordinator node. + // Since kafkaApis for each RPC type are separate, we cannot batch different types of RPCs. Hence, we need + // RPCType'd key inner map. + // The RPC schemas defined in kip-932 have a single group id per request. Hence, we cannot batch RPCs + // with different groupIds and therefore, another inner map keyed on groupId is needed. + // Finally, the value is a list of handlers + private final Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>(); + + // Final object to serve synchronization needs. + private final Object nodeMapLock = new Object(); + + // Called when the generateRequests method is executed by InterBrokerSendThread, returning requests. + // Mainly for testing and introspection purpose to inspect the state of the nodeRPC map + // when generateRequests is called. + private Runnable generateCallback; + + public enum RPCType { + READ, + WRITE, + DELETE, + SUMMARY, + UNKNOWN + } + + public PersisterStateManager(KafkaClient client, Time time, ShareCoordinatorMetadataCacheHelper cacheHelper) { + if (client == null) { + throw new IllegalArgumentException("Kafkaclient must not be null."); + } + if (cacheHelper == null) { + throw new IllegalArgumentException("ShareCoordinatorMetadataCacheHelper must not be null."); + } + this.cacheHelper = cacheHelper; + this.time = time == null ? Time.SYSTEM : time; + this.sender = new SendThread( + "PersisterStateManager", + client, + 30_000, //30 seconds + this.time, + true, + new Random(this.time.milliseconds())); + } + + public void enqueue(PersisterStateManagerHandler handler) { + this.sender.enqueue(handler); + } + + public void start() { + if (!isStarted.get()) { + this.sender.start(); + isStarted.set(true); + } + } + + public void stop() throws InterruptedException { + this.sender.shutdown(); + } + + // test visibility + Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap() { + return nodeRPCMap; + } + + public void setGenerateCallback(Runnable generateCallback) { + this.generateCallback = generateCallback; + } + + /** + * Parent class of all RPCs. Uses template pattern to implement core methods. + * Various child classes can extend this class to define how to handle RPC specific + * responses, retries, batching etc. + * <p> + * Since the find coordinator RPC/lookup is a necessary pre-condition for all + * share state RPCs, the infra code for it is encapsulated in this class itself. + */ + public abstract class PersisterStateManagerHandler implements RequestCompletionHandler { + protected Node coordinatorNode; + protected final String groupId; + protected final Uuid topicId; + protected final int partition; + private final ExponentialBackoff findCoordBackoff; + private int findCoordAttempts = 0; + private final int maxFindCoordAttempts; + protected final Logger log = LoggerFactory.getLogger(getClass()); + private Consumer<ClientResponse> onCompleteCallback; + + public PersisterStateManagerHandler( + String groupId, + Uuid topicId, + int partition, + long backoffMs, + long backoffMaxMs, + int maxFindCoordAttempts + ) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + this.findCoordBackoff = new ExponentialBackoff( + backoffMs, + CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, + backoffMaxMs, + CommonClientConfigs.RETRY_BACKOFF_JITTER); + this.maxFindCoordAttempts = maxFindCoordAttempts; + } + + /** + * Child class must create appropriate builder object for the handled RPC + * + * @return builder for the request + */ + protected abstract AbstractRequest.Builder<? extends AbstractRequest> requestBuilder(); + + /** + * Handles the response for an RPC. + * + * @param response - Client response + */ + protected abstract void handleRequestResponse(ClientResponse response); + + /** + * Returns true if the response if valid for the respective child class. + * + * @param response - Client response + * @return - boolean + */ + protected abstract boolean isRequestResponse(ClientResponse response); + + /** + * Handle invalid find coordinator response. If error is UNKNOWN_SERVER_ERROR. Look at the + * exception details to figure out the problem. + * + * @param error + * @param exception + */ + protected abstract void findCoordinatorErrorResponse(Errors error, Exception exception); + + /** + * Child class must provide a descriptive name for the implementation. + * + * @return String + */ + protected abstract String name(); + + /** + * Child class must return appropriate type of RPC here + * + * @return String + */ + protected abstract RPCType rpcType(); + + /** + * Returns builder for share coordinator + * + * @return builder for find coordinator + */ + protected AbstractRequest.Builder<FindCoordinatorRequest> findShareCoordinatorBuilder() { + return new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id()) + .setKey(coordinatorKey())); + } + + public void addRequestToNodeMap(Node node, PersisterStateManagerHandler handler) { + if (!handler.isBatchable()) { + return; + } + synchronized (nodeMapLock) { + nodeRPCMap.computeIfAbsent(node, k -> new HashMap<>()) + .computeIfAbsent(handler.rpcType(), k -> new HashMap<>()) + .computeIfAbsent(handler.groupId, k -> new LinkedList<>()) + .add(handler); + } + sender.wakeup(); + } + + /** + * Returns true is coordinator node is not yet set + * + * @return boolean + */ + protected boolean lookupNeeded() { + if (coordinatorNode != null) { + return false; + } + if (cacheHelper.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME)) { + log.debug("{} internal topic already exists.", Topic.SHARE_GROUP_STATE_TOPIC_NAME); + Node node = cacheHelper.getShareCoordinator(coordinatorKey(), Topic.SHARE_GROUP_STATE_TOPIC_NAME); + if (node != Node.noNode()) { + log.debug("Found coordinator node in cache: {}", node); + coordinatorNode = node; + addRequestToNodeMap(node, this); + return false; + } + } + return true; + } + + /** + * Returns the String key to be used as share coordinator key + * + * @return String + */ + protected String coordinatorKey() { + return SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + } + + /** + * Returns true if the RPC response if for Find Coordinator RPC. + * + * @param response - Client response object + * @return boolean + */ + protected boolean isFindCoordinatorResponse(ClientResponse response) { + return response != null && response.requestHeader().apiKey() == ApiKeys.FIND_COORDINATOR; + } + + @Override + public void onComplete(ClientResponse response) { + if (onCompleteCallback != null) { + onCompleteCallback.accept(response); + } + if (response != null && response.hasResponse()) { + if (isFindCoordinatorResponse(response)) { + handleFindCoordinatorResponse(response); + } else if (isRequestResponse(response)) { + handleRequestResponse(response); + } + } + sender.wakeup(); + } + + private void resetAttempts() { + findCoordAttempts = 0; + } + + private void resetCoordinatorNode() { + coordinatorNode = null; + } + + /** + * Handles the response for find coordinator RPC and sets appropriate state. + * + * @param response - Client response for find coordinator RPC + */ + protected void handleFindCoordinatorResponse(ClientResponse response) { + log.debug("Find coordinator response received - {}", response); + + // Incrementing the number of find coordinator attempts + findCoordAttempts++; + List<FindCoordinatorResponseData.Coordinator> coordinators = ((FindCoordinatorResponse) response.responseBody()).coordinators(); + if (coordinators.size() != 1) { + log.error("Find coordinator response for {} is invalid", coordinatorKey()); + findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new IllegalStateException("Invalid response with multiple coordinators.")); + return; + } + + FindCoordinatorResponseData.Coordinator coordinatorData = coordinators.get(0); + Errors error = Errors.forCode(coordinatorData.errorCode()); + + switch (error) { + case NONE: + log.debug("Find coordinator response valid. Enqueuing actual request."); + resetAttempts(); + coordinatorNode = new Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port()); + // now we want the actual share state RPC call to happen + if (this.isBatchable()) { + addRequestToNodeMap(coordinatorNode, this); + } else { + enqueue(this); + } + break; + + case COORDINATOR_NOT_AVAILABLE: // retriable error codes + case COORDINATOR_LOAD_IN_PROGRESS: + log.warn("Received retriable error in find coordinator: {}", error.message()); + if (findCoordAttempts >= this.maxFindCoordAttempts) { + log.error("Exhausted max retries to find coordinator without success."); + findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success.")); + break; + } + log.debug("Waiting before retrying find coordinator RPC."); + try { + TimeUnit.MILLISECONDS.sleep(findCoordBackoff.backoff(findCoordAttempts)); + } catch (InterruptedException e) { + log.warn("Interrupted waiting before retrying find coordinator request", e); + } + resetCoordinatorNode(); + enqueue(this); + break; + + default: + log.error("Unable to find coordinator."); + findCoordinatorErrorResponse(error, null); + } + } + + // Visible for testing + public Node getCoordinatorNode() { + return coordinatorNode; + } + + protected abstract boolean isBatchable(); + + /** + * This method can be called by child class objects to register a callback + * which will be called when the onComplete cb is called on request completion. + * @param callback + */ + protected void setOnCompleteCallback(Consumer<ClientResponse> callback) { + this.onCompleteCallback = callback; + } + } + + public class WriteStateHandler extends PersisterStateManagerHandler { + private final int stateEpoch; + private final int leaderEpoch; + private final long startOffset; + private final List<PersisterStateBatch> batches; + private final CompletableFuture<WriteShareGroupStateResponse> result; + + public WriteStateHandler( + String groupId, + Uuid topicId, + int partition, + int stateEpoch, + int leaderEpoch, + long startOffset, + List<PersisterStateBatch> batches, + CompletableFuture<WriteShareGroupStateResponse> result, + long backoffMs, + long backoffMaxMs, + int maxFindCoordAttempts + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxFindCoordAttempts); + this.stateEpoch = stateEpoch; + this.leaderEpoch = leaderEpoch; + this.startOffset = startOffset; + this.batches = batches; + this.result = result; + } + + public WriteStateHandler( + String groupId, + Uuid topicId, + int partition, + int stateEpoch, + int leaderEpoch, + long startOffset, + List<PersisterStateBatch> batches, + CompletableFuture<WriteShareGroupStateResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + stateEpoch, + leaderEpoch, + startOffset, + batches, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS + ); + } + + @Override + protected String name() { + return "WriteStateHandler"; + } + + @Override + protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() { + throw new RuntimeException("Write requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isRequestResponse(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.WRITE_SHARE_GROUP_STATE; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Write state response received - {}", response); + // response can be a combined one for large number of requests + // we need to deconstruct it + WriteShareGroupStateResponse combinedResponse = (WriteShareGroupStateResponse) response.responseBody(); + for (WriteShareGroupStateResponseData.WriteStateResult writeStateResult : combinedResponse.data().results()) { + if (writeStateResult.topicId().equals(topicId)) { + Optional<WriteShareGroupStateResponseData.PartitionResult> partitionStateData = + writeStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partition) + .findFirst(); + if (partitionStateData.isPresent()) { + WriteShareGroupStateResponseData.WriteStateResult result = WriteShareGroupStateResponse.toResponseWriteStateResult(topicId, Collections.singletonList(partitionStateData.get())); + this.result.complete(new WriteShareGroupStateResponse( + new WriteShareGroupStateResponseData().setResults(Collections.singletonList(result)))); + return; + } + } + } + } + + @Override + protected void findCoordinatorErrorResponse(Errors error, Exception exception) { + this.result.complete(new WriteShareGroupStateResponse( + WriteShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in find coordinator. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + // Visible for testing + public CompletableFuture<WriteShareGroupStateResponse> getResult() { + return result; + } + + @Override + protected boolean isBatchable() { + return true; + } + + @Override + protected RPCType rpcType() { + return RPCType.WRITE; + } + } + + public class ReadStateHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final String coordinatorKey; + private final CompletableFuture<ReadShareGroupStateResponse> result; + + public ReadStateHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateResponse> result, + long backoffMs, + long backoffMaxMs, + int maxFindCoordAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxFindCoordAttempts); + this.leaderEpoch = leaderEpoch; + this.coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + this.result = result; + } + + public ReadStateHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateRequest> requestBuilder() { + throw new RuntimeException("Read requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isRequestResponse(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state response received - {}", response); + + ReadShareGroupStateResponse combinedResponse = (ReadShareGroupStateResponse) response.responseBody(); + ReadShareGroupStateResponseData readShareGroupStateResponseData = new ReadShareGroupStateResponseData(); + for (ReadShareGroupStateResponseData.ReadStateResult readStateResult : combinedResponse.data().results()) { + if (readStateResult.topicId().equals(topicId)) { + Optional<ReadShareGroupStateResponseData.PartitionResult> partitionStateData = + readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partition) + .findFirst(); + + if (partitionStateData.isPresent()) { + ReadShareGroupStateResponseData.ReadStateResult result = ReadShareGroupStateResponse.toResponseReadStateResult(topicId, Collections.singletonList(partitionStateData.get())); + readShareGroupStateResponseData.setResults(Collections.singletonList(result)); + break; + } + } + } + + String errorMessage = "Failed to read state for partition " + partition + " in topic " + topicId + " for group " + groupId; + if (readShareGroupStateResponseData.results().size() != 1) { + log.error("ReadState response for {} is invalid", coordinatorKey); + this.result.complete(new ReadShareGroupStateResponse( + ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, Errors.forException(new IllegalStateException(errorMessage)), errorMessage))); + } + ReadShareGroupStateResponseData.ReadStateResult topicData = readShareGroupStateResponseData.results().get(0); + if (!topicData.topicId().equals(topicId)) { + log.error("ReadState response for {} is invalid", coordinatorKey); + this.result.complete(new ReadShareGroupStateResponse( + ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, Errors.forException(new IllegalStateException(errorMessage)), errorMessage))); + } + if (topicData.partitions().size() != 1) { + log.error("ReadState response for {} is invalid", coordinatorKey); + this.result.complete(new ReadShareGroupStateResponse( + ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, Errors.forException(new IllegalStateException(errorMessage)), errorMessage))); + } + ReadShareGroupStateResponseData.PartitionResult partitionResponse = topicData.partitions().get(0); + if (partitionResponse.partition() != partition) { + log.error("ReadState response for {} is invalid", coordinatorKey); + this.result.complete(new ReadShareGroupStateResponse( + ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, Errors.forException(new IllegalStateException(errorMessage)), errorMessage))); + } + result.complete(new ReadShareGroupStateResponse(readShareGroupStateResponseData)); + } + + @Override + protected void findCoordinatorErrorResponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateResponse( + ReadShareGroupStateResponse.toErrorResponseData(topicId, partition, error, "Error in find coordinator. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + // Visible for testing + public CompletableFuture<ReadShareGroupStateResponse> getResult() { + return result; + } + + @Override + protected boolean isBatchable() { + return true; + } + + @Override + protected RPCType rpcType() { + return RPCType.READ; + } + } + + private class SendThread extends InterBrokerSendThread { + private final ConcurrentLinkedQueue<PersisterStateManagerHandler> queue = new ConcurrentLinkedQueue<>(); + private final Random random; + + public SendThread(String name, KafkaClient networkClient, int requestTimeoutMs, Time time, boolean isInterruptible, Random random) { + super(name, networkClient, requestTimeoutMs, time, isInterruptible); + this.random = random; + } + + private Node randomNode() { + List<Node> nodes = cacheHelper.getClusterNodes(); + if (nodes == null || nodes.isEmpty()) { + return Node.noNode(); + } + return nodes.get(random.nextInt(nodes.size())); + } + + /** + * The incoming requests will have the keys in the following format + * groupId: [ + * topidId1: [part1, part2, part3], + * topicId2: [part1, part2, part3] + * ... + * ] + * Hence, the total number of keys would be 1 x m x n (1 is for the groupId) where m is number of topicIds + * and n is number of partitions specified per topicId. + * <p> + * For each RPC, we need to identify the coordinator node first. + * If the internal share state topic is not found in the metadata cache, when RPC is received + * we will need to make a FIND_COORDINATOR RPC which will have the side effect of creating the internal + * topic as well. If the node is found in the cache, we will use it directly. + * + * @return list of requests to send + */ + @Override + public Collection<RequestAndCompletionHandler> generateRequests() { + // There are two sources for requests here: + // 1. A queue which will contain FIND_CORD RPCs and other non-batchable RPCs. + // 2. A hashMap keyed on the share coordinator nodes which may contain batched requests. + + if (generateCallback != null) { + generateCallback.run(); + } + List<RequestAndCompletionHandler> requests = new ArrayList<>(); + + // honor queue first as find coordinator + // is mandatory for batching and sending the + // request to correct destination node + if (!queue.isEmpty()) { + PersisterStateManagerHandler handler = queue.peek(); + queue.poll(); + if (handler.lookupNeeded()) { + // we need to find the coordinator node + Node randomNode = randomNode(); + if (randomNode == Node.noNode()) { + log.error("Unable to find node to use for coordinator lookup."); + // fatal failure, cannot retry or progress + // fail the RPC + handler.findCoordinatorErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Errors.COORDINATOR_NOT_AVAILABLE.exception()); + return Collections.emptyList(); + } + log.debug("Sending find coordinator RPC"); + return Collections.singletonList(new RequestAndCompletionHandler( + time.milliseconds(), + randomNode, + handler.findShareCoordinatorBuilder(), + handler + )); + } else { + // useful for tests and + // other RPCs which might not be batchable + if (!handler.isBatchable()) { Review Comment: We have this tested. We have written a TestHandler in PersisterStateManager which is not batchable and did a few tests with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
