smjn commented on code in PR #17270:
URL: https://github.com/apache/kafka/pull/17270#discussion_r1817325745


##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -0,0 +1,965 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates various handler classes corresponding to share
+ * state RPCs. It also holds an {@link InterBrokerSendThread} specialization
+ * which manages the sending the RPC requests over the network.
+ * This class is for the exclusive purpose of being used with {@link 
DefaultStatePersister}
+ * but can be extended for other {@link Persister} implementations as well.
+ */
+public class PersisterStateManager {
+    private SendThread sender;
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    public static final long REQUEST_BACKOFF_MS = 1_000L;
+    public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int MAX_FIND_COORD_ATTEMPTS = 5;
+    private final Time time;
+    private final Timer timer;
+    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
+    // holds the set of share coord nodes for each RPC type which is currently 
sent but not completed
+    private final Map<RPCType, Set<Node>> inFlight = new HashMap<>();
+
+    // Mapping for batchable RPCs. The top level grouping is based on 
destination share coordinator node.
+    // Since kafkaApis for each RPC type are separate, we cannot batch 
different types of RPCs. Hence, we need
+    // RPCType'd key inner map.
+    // The RPC schemas defined in kip-932 have a single group id per request. 
Hence, we cannot batch RPCs
+    // with different groupIds and therefore, another inner map keyed on 
groupId is needed.
+    // Finally, the value is a list of handlers
+    private final Map<Node, Map<RPCType, Map<String, 
List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>();
+
+    // Final object to serve synchronization needs.
+    private final Object nodeMapLock = new Object();
+
+    // Called when the generateRequests method is executed by 
InterBrokerSendThread, returning requests.
+    // Mainly for testing and introspection purpose to inspect the state of 
the nodeRPC map
+    // when generateRequests is called.
+    private Runnable generateCallback;
+
+    private static class BackoffManager {
+        private final int maxAttempts;
+        private int attempts;
+        private final ExponentialBackoff backoff;
+
+        BackoffManager(int maxAttempts, long initialBackoffMs, long 
maxBackoffMs) {
+            this.maxAttempts = maxAttempts;
+            this.backoff = new ExponentialBackoff(
+                initialBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+                maxBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_JITTER
+            );
+        }
+
+        void incrementAttempt() {
+            attempts++;
+        }
+
+        void resetAttempts() {
+            attempts = 0;
+        }
+
+        boolean canAttempt() {
+            return attempts < maxAttempts;
+        }
+
+        long backOff() {
+            return this.backoff.backoff(attempts);
+        }
+    }
+
+    public enum RPCType {
+        READ,
+        WRITE,
+        DELETE,
+        SUMMARY,
+        UNKNOWN
+    }
+
+    public PersisterStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        if (client == null) {
+            throw new IllegalArgumentException("Kafkaclient must not be 
null.");
+        }
+        this.time = time == null ? Time.SYSTEM : time;
+        this.sender = new SendThread(
+            "PersisterStateManager",
+            client,
+            30_000,  //30 seconds
+            this.time,
+            true,
+            new Random(this.time.milliseconds()));
+        this.timer = timer;
+        this.cacheHelper = cacheHelper;
+    }
+
+    public void enqueue(PersisterStateManagerHandler handler) {
+        this.sender.enqueue(handler);
+    }
+
+    public void start() {
+        if (isStarted.compareAndSet(false, true)) {
+            this.sender.start();
+            isStarted.set(true);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (isStarted.compareAndSet(true, false)) {
+            this.sender.shutdown();
+            Utils.closeQuietly(this.timer, "PersisterStateManager timer");
+        }
+    }
+
+    // test visibility
+    Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> 
nodeRPCMap() {
+        return nodeRPCMap;
+    }
+
+    public void setGenerateCallback(Runnable generateCallback) {
+        this.generateCallback = generateCallback;
+    }
+
+    /**
+     * Parent class of all RPCs. Uses template pattern to implement core 
methods.
+     * Various child classes can extend this class to define how to handle RPC 
specific
+     * responses, retries, batching etc.
+     * <p>
+     * Since the find coordinator RPC/lookup is a necessary pre-condition for 
all
+     * share state RPCs, the infra code for it is encapsulated in this class 
itself.
+     */
+    public abstract class PersisterStateManagerHandler implements 
RequestCompletionHandler {
+        protected Node coordinatorNode;
+        protected final String groupId;
+        protected final Uuid topicId;
+        protected final int partition;
+        private final BackoffManager findCoordBackoff;
+        protected final Logger log = LoggerFactory.getLogger(getClass());
+        private Consumer<ClientResponse> onCompleteCallback;
+
+        public PersisterStateManagerHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts
+        ) {
+            this.groupId = groupId;
+            this.topicId = topicId;
+            this.partition = partition;
+            this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);

Review Comment:
   We can, however I am depending on coordinatorNode to be null as it can be a 
valid node or noNode, based on the FIND_COORD response. I need a value to 
signify that we have not yet sent a FIND_CORD request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to