mumrah commented on code in PR #17270:
URL: https://github.com/apache/kafka/pull/17270#discussion_r1816970631


##########
core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+public class ShareCoordinatorMetadataCacheHelperImpl implements 
ShareCoordinatorMetadataCacheHelper {
+    private final KRaftMetadataCache metadataCache;

Review Comment:
   Do we really need a reference to KRaftMetadataCache, or is MetadataCache 
sufficient? I would prefer this class to be in the share coordinator module 
rather than core.



##########
share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The default implementation of the {@link Persister} interface which is used 
by the
+ * group coordinator and share-partition leaders to manage the durable 
share-partition state.
+ * This implementation uses inter-broker RPCs to make requests to the share 
coordinator
+ * which is responsible for persisting the share-partition state.
+ */
+public class DefaultStatePersister implements Persister {
+    private final PersisterStateManager stateManager;
+
+    private static final Logger log = 
LoggerFactory.getLogger(DefaultStatePersister.class);
+
+    public DefaultStatePersister(PersisterStateManager stateManager) {
+        this.stateManager = stateManager;
+        this.stateManager.start();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (stateManager != null) {
+                stateManager.stop();
+            }
+        } catch (Exception e) {
+            log.error("Unable to stop state manager", e);
+        }
+    }
+
+    /**
+     * Used by the group coordinator to initialize the share-partition state.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request InitializeShareGroupStateParameters
+     * @return A completable future of InitializeShareGroupStateResult
+     */
+    public CompletableFuture<InitializeShareGroupStateResult> 
initializeState(InitializeShareGroupStateParameters request) throws 
IllegalArgumentException {
+        throw new RuntimeException("not implemented");
+    }
+
+    /**
+     * Used by share-partition leaders to write share-partition state to a 
share coordinator.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request WriteShareGroupStateParameters
+     * @return A completable future of WriteShareGroupStateResult
+     */
+    public CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request) throws 
IllegalArgumentException {
+        validate(request);
+        GroupTopicPartitionData<PartitionStateBatchData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+
+        Map<Uuid, Map<Integer, 
CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>();
+        List<PersisterStateManager.WriteStateHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<WriteShareGroupStateResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new WriteStateHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.stateEpoch(),
+                        partitionData.leaderEpoch(),
+                        partitionData.startOffset(),
+                        partitionData.stateBatches(),
+                        future, null)
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.WriteStateHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        return combinedFuture.thenApply(v -> {
+            List<TopicData<PartitionErrorData>> topicsData = 
futureMap.keySet().stream()
+                .map(topicId -> {
+                    List<PartitionErrorData> partitionErrData = 
futureMap.get(topicId).entrySet().stream()
+                        .map(partitionFuture -> {
+                            int partition = partitionFuture.getKey();
+                            CompletableFuture<WriteShareGroupStateResponse> 
future = partitionFuture.getValue();
+                            try {
+                                WriteShareGroupStateResponse partitionResponse 
= future.get();
+                                return 
partitionResponse.data().results().get(0).partitions().stream()
+                                    .map(partitionResult -> 
PartitionFactory.newPartitionErrorData(

Review Comment:
   Here we use newPartitionErrorData, but in readState we use 
   
   ```
   PartitionFactory.newPartitionAllData(
                                       partition,
                                       -1,
                                       -1,
                                       Errors.UNKNOWN_SERVER_ERROR.code(),   // 
No specific public error code exists for InterruptedException / 
ExecutionException
                                       "Error reading state from share 
coordinator: " + e.getMessage(),
                                       Collections.emptyList())```
   
   why the difference in error handling?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -0,0 +1,965 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates various handler classes corresponding to share
+ * state RPCs. It also holds an {@link InterBrokerSendThread} specialization
+ * which manages the sending the RPC requests over the network.
+ * This class is for the exclusive purpose of being used with {@link 
DefaultStatePersister}
+ * but can be extended for other {@link Persister} implementations as well.
+ */
+public class PersisterStateManager {
+    private SendThread sender;
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    public static final long REQUEST_BACKOFF_MS = 1_000L;
+    public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int MAX_FIND_COORD_ATTEMPTS = 5;
+    private final Time time;
+    private final Timer timer;
+    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
+    // holds the set of share coord nodes for each RPC type which is currently 
sent but not completed
+    private final Map<RPCType, Set<Node>> inFlight = new HashMap<>();
+
+    // Mapping for batchable RPCs. The top level grouping is based on 
destination share coordinator node.
+    // Since kafkaApis for each RPC type are separate, we cannot batch 
different types of RPCs. Hence, we need
+    // RPCType'd key inner map.
+    // The RPC schemas defined in kip-932 have a single group id per request. 
Hence, we cannot batch RPCs
+    // with different groupIds and therefore, another inner map keyed on 
groupId is needed.
+    // Finally, the value is a list of handlers
+    private final Map<Node, Map<RPCType, Map<String, 
List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>();
+
+    // Final object to serve synchronization needs.
+    private final Object nodeMapLock = new Object();
+
+    // Called when the generateRequests method is executed by 
InterBrokerSendThread, returning requests.
+    // Mainly for testing and introspection purpose to inspect the state of 
the nodeRPC map
+    // when generateRequests is called.
+    private Runnable generateCallback;
+
+    private static class BackoffManager {
+        private final int maxAttempts;
+        private int attempts;
+        private final ExponentialBackoff backoff;
+
+        BackoffManager(int maxAttempts, long initialBackoffMs, long 
maxBackoffMs) {
+            this.maxAttempts = maxAttempts;
+            this.backoff = new ExponentialBackoff(
+                initialBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+                maxBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_JITTER
+            );
+        }
+
+        void incrementAttempt() {
+            attempts++;
+        }
+
+        void resetAttempts() {
+            attempts = 0;
+        }
+
+        boolean canAttempt() {
+            return attempts < maxAttempts;
+        }
+
+        long backOff() {
+            return this.backoff.backoff(attempts);
+        }
+    }
+
+    public enum RPCType {
+        READ,
+        WRITE,
+        DELETE,
+        SUMMARY,
+        UNKNOWN
+    }
+
+    public PersisterStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        if (client == null) {
+            throw new IllegalArgumentException("Kafkaclient must not be 
null.");
+        }
+        this.time = time == null ? Time.SYSTEM : time;

Review Comment:
   If we want some default constructor things, we should use a builder pattern 
instead of allowing nulls. 



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -216,6 +216,16 @@ class BrokerMetadataPublisher(
           s"coordinator with local changes in $deltaName", t)
       }
 
+      try {
+        // Propagate the new image to the share coordinator.
+        if (shareCoordinator.isDefined) {

Review Comment:
   nit: use `foreach` (the scala convention for optionals)



##########
share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The default implementation of the {@link Persister} interface which is used 
by the
+ * group coordinator and share-partition leaders to manage the durable 
share-partition state.
+ * This implementation uses inter-broker RPCs to make requests to the share 
coordinator
+ * which is responsible for persisting the share-partition state.
+ */
+public class DefaultStatePersister implements Persister {
+    private final PersisterStateManager stateManager;
+
+    private static final Logger log = 
LoggerFactory.getLogger(DefaultStatePersister.class);
+
+    public DefaultStatePersister(PersisterStateManager stateManager) {
+        this.stateManager = stateManager;
+        this.stateManager.start();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (stateManager != null) {
+                stateManager.stop();
+            }
+        } catch (Exception e) {
+            log.error("Unable to stop state manager", e);
+        }
+    }
+
+    /**
+     * Used by the group coordinator to initialize the share-partition state.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request InitializeShareGroupStateParameters
+     * @return A completable future of InitializeShareGroupStateResult
+     */
+    public CompletableFuture<InitializeShareGroupStateResult> 
initializeState(InitializeShareGroupStateParameters request) throws 
IllegalArgumentException {
+        throw new RuntimeException("not implemented");
+    }
+
+    /**
+     * Used by share-partition leaders to write share-partition state to a 
share coordinator.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request WriteShareGroupStateParameters
+     * @return A completable future of WriteShareGroupStateResult
+     */
+    public CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request) throws 
IllegalArgumentException {
+        validate(request);
+        GroupTopicPartitionData<PartitionStateBatchData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+
+        Map<Uuid, Map<Integer, 
CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>();
+        List<PersisterStateManager.WriteStateHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<WriteShareGroupStateResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new WriteStateHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.stateEpoch(),
+                        partitionData.leaderEpoch(),
+                        partitionData.startOffset(),
+                        partitionData.stateBatches(),
+                        future, null)
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.WriteStateHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        return combinedFuture.thenApply(v -> {
+            List<TopicData<PartitionErrorData>> topicsData = 
futureMap.keySet().stream()
+                .map(topicId -> {
+                    List<PartitionErrorData> partitionErrData = 
futureMap.get(topicId).entrySet().stream()
+                        .map(partitionFuture -> {
+                            int partition = partitionFuture.getKey();
+                            CompletableFuture<WriteShareGroupStateResponse> 
future = partitionFuture.getValue();
+                            try {
+                                WriteShareGroupStateResponse partitionResponse 
= future.get();
+                                return 
partitionResponse.data().results().get(0).partitions().stream()

Review Comment:
   Are we sure `results()` is always populated? Is the `get(0)` call safe? 
   
   Also why do we only care about the first result's partitions? Does every 
result have the same set of partitions?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -0,0 +1,965 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates various handler classes corresponding to share
+ * state RPCs. It also holds an {@link InterBrokerSendThread} specialization
+ * which manages the sending the RPC requests over the network.
+ * This class is for the exclusive purpose of being used with {@link 
DefaultStatePersister}
+ * but can be extended for other {@link Persister} implementations as well.
+ */
+public class PersisterStateManager {
+    private SendThread sender;
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    public static final long REQUEST_BACKOFF_MS = 1_000L;
+    public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int MAX_FIND_COORD_ATTEMPTS = 5;
+    private final Time time;
+    private final Timer timer;
+    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
+    // holds the set of share coord nodes for each RPC type which is currently 
sent but not completed
+    private final Map<RPCType, Set<Node>> inFlight = new HashMap<>();
+
+    // Mapping for batchable RPCs. The top level grouping is based on 
destination share coordinator node.
+    // Since kafkaApis for each RPC type are separate, we cannot batch 
different types of RPCs. Hence, we need
+    // RPCType'd key inner map.
+    // The RPC schemas defined in kip-932 have a single group id per request. 
Hence, we cannot batch RPCs
+    // with different groupIds and therefore, another inner map keyed on 
groupId is needed.
+    // Finally, the value is a list of handlers
+    private final Map<Node, Map<RPCType, Map<String, 
List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>();
+
+    // Final object to serve synchronization needs.
+    private final Object nodeMapLock = new Object();
+
+    // Called when the generateRequests method is executed by 
InterBrokerSendThread, returning requests.
+    // Mainly for testing and introspection purpose to inspect the state of 
the nodeRPC map
+    // when generateRequests is called.
+    private Runnable generateCallback;
+
+    private static class BackoffManager {
+        private final int maxAttempts;
+        private int attempts;
+        private final ExponentialBackoff backoff;
+
+        BackoffManager(int maxAttempts, long initialBackoffMs, long 
maxBackoffMs) {
+            this.maxAttempts = maxAttempts;
+            this.backoff = new ExponentialBackoff(
+                initialBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+                maxBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_JITTER
+            );
+        }
+
+        void incrementAttempt() {
+            attempts++;
+        }
+
+        void resetAttempts() {
+            attempts = 0;
+        }
+
+        boolean canAttempt() {
+            return attempts < maxAttempts;
+        }
+
+        long backOff() {
+            return this.backoff.backoff(attempts);
+        }
+    }
+
+    public enum RPCType {
+        READ,
+        WRITE,
+        DELETE,
+        SUMMARY,
+        UNKNOWN
+    }
+
+    public PersisterStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        if (client == null) {
+            throw new IllegalArgumentException("Kafkaclient must not be 
null.");
+        }
+        this.time = time == null ? Time.SYSTEM : time;
+        this.sender = new SendThread(
+            "PersisterStateManager",
+            client,
+            30_000,  //30 seconds
+            this.time,
+            true,
+            new Random(this.time.milliseconds()));
+        this.timer = timer;
+        this.cacheHelper = cacheHelper;
+    }
+
+    public void enqueue(PersisterStateManagerHandler handler) {
+        this.sender.enqueue(handler);
+    }
+
+    public void start() {
+        if (isStarted.compareAndSet(false, true)) {
+            this.sender.start();
+            isStarted.set(true);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (isStarted.compareAndSet(true, false)) {
+            this.sender.shutdown();
+            Utils.closeQuietly(this.timer, "PersisterStateManager timer");
+        }
+    }
+
+    // test visibility
+    Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> 
nodeRPCMap() {
+        return nodeRPCMap;
+    }
+
+    public void setGenerateCallback(Runnable generateCallback) {
+        this.generateCallback = generateCallback;
+    }
+
+    /**
+     * Parent class of all RPCs. Uses template pattern to implement core 
methods.
+     * Various child classes can extend this class to define how to handle RPC 
specific
+     * responses, retries, batching etc.
+     * <p>
+     * Since the find coordinator RPC/lookup is a necessary pre-condition for 
all
+     * share state RPCs, the infra code for it is encapsulated in this class 
itself.
+     */
+    public abstract class PersisterStateManagerHandler implements 
RequestCompletionHandler {
+        protected Node coordinatorNode;
+        protected final String groupId;
+        protected final Uuid topicId;
+        protected final int partition;
+        private final BackoffManager findCoordBackoff;
+        protected final Logger log = LoggerFactory.getLogger(getClass());
+        private Consumer<ClientResponse> onCompleteCallback;
+
+        public PersisterStateManagerHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts
+        ) {
+            this.groupId = groupId;
+            this.topicId = topicId;
+            this.partition = partition;
+            this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);

Review Comment:
   A few fields are not initialized here (implicit null). Can we explicitly 
initialize them here?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -0,0 +1,965 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates various handler classes corresponding to share
+ * state RPCs. It also holds an {@link InterBrokerSendThread} specialization
+ * which manages the sending the RPC requests over the network.
+ * This class is for the exclusive purpose of being used with {@link 
DefaultStatePersister}
+ * but can be extended for other {@link Persister} implementations as well.
+ */
+public class PersisterStateManager {
+    private SendThread sender;
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    public static final long REQUEST_BACKOFF_MS = 1_000L;
+    public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int MAX_FIND_COORD_ATTEMPTS = 5;
+    private final Time time;
+    private final Timer timer;
+    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
+    // holds the set of share coord nodes for each RPC type which is currently 
sent but not completed
+    private final Map<RPCType, Set<Node>> inFlight = new HashMap<>();
+
+    // Mapping for batchable RPCs. The top level grouping is based on 
destination share coordinator node.
+    // Since kafkaApis for each RPC type are separate, we cannot batch 
different types of RPCs. Hence, we need
+    // RPCType'd key inner map.
+    // The RPC schemas defined in kip-932 have a single group id per request. 
Hence, we cannot batch RPCs
+    // with different groupIds and therefore, another inner map keyed on 
groupId is needed.
+    // Finally, the value is a list of handlers
+    private final Map<Node, Map<RPCType, Map<String, 
List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>();
+
+    // Final object to serve synchronization needs.
+    private final Object nodeMapLock = new Object();
+
+    // Called when the generateRequests method is executed by 
InterBrokerSendThread, returning requests.
+    // Mainly for testing and introspection purpose to inspect the state of 
the nodeRPC map
+    // when generateRequests is called.
+    private Runnable generateCallback;
+
+    private static class BackoffManager {
+        private final int maxAttempts;
+        private int attempts;
+        private final ExponentialBackoff backoff;
+
+        BackoffManager(int maxAttempts, long initialBackoffMs, long 
maxBackoffMs) {
+            this.maxAttempts = maxAttempts;
+            this.backoff = new ExponentialBackoff(
+                initialBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+                maxBackoffMs,
+                CommonClientConfigs.RETRY_BACKOFF_JITTER
+            );
+        }
+
+        void incrementAttempt() {
+            attempts++;
+        }
+
+        void resetAttempts() {
+            attempts = 0;
+        }
+
+        boolean canAttempt() {
+            return attempts < maxAttempts;
+        }
+
+        long backOff() {
+            return this.backoff.backoff(attempts);
+        }
+    }
+
+    public enum RPCType {
+        READ,
+        WRITE,
+        DELETE,
+        SUMMARY,
+        UNKNOWN
+    }
+
+    public PersisterStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        if (client == null) {
+            throw new IllegalArgumentException("Kafkaclient must not be 
null.");
+        }
+        this.time = time == null ? Time.SYSTEM : time;
+        this.sender = new SendThread(
+            "PersisterStateManager",
+            client,
+            30_000,  //30 seconds

Review Comment:
   Hmm, should this be configured? 



##########
share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The default implementation of the {@link Persister} interface which is used 
by the
+ * group coordinator and share-partition leaders to manage the durable 
share-partition state.
+ * This implementation uses inter-broker RPCs to make requests to the share 
coordinator
+ * which is responsible for persisting the share-partition state.
+ */
+public class DefaultStatePersister implements Persister {
+    private final PersisterStateManager stateManager;
+
+    private static final Logger log = 
LoggerFactory.getLogger(DefaultStatePersister.class);
+
+    public DefaultStatePersister(PersisterStateManager stateManager) {
+        this.stateManager = stateManager;
+        this.stateManager.start();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (stateManager != null) {
+                stateManager.stop();
+            }
+        } catch (Exception e) {
+            log.error("Unable to stop state manager", e);
+        }
+    }
+
+    /**
+     * Used by the group coordinator to initialize the share-partition state.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request InitializeShareGroupStateParameters
+     * @return A completable future of InitializeShareGroupStateResult
+     */
+    public CompletableFuture<InitializeShareGroupStateResult> 
initializeState(InitializeShareGroupStateParameters request) throws 
IllegalArgumentException {
+        throw new RuntimeException("not implemented");
+    }
+
+    /**
+     * Used by share-partition leaders to write share-partition state to a 
share coordinator.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request WriteShareGroupStateParameters
+     * @return A completable future of WriteShareGroupStateResult
+     */
+    public CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request) throws 
IllegalArgumentException {
+        validate(request);
+        GroupTopicPartitionData<PartitionStateBatchData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+
+        Map<Uuid, Map<Integer, 
CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>();
+        List<PersisterStateManager.WriteStateHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<WriteShareGroupStateResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new WriteStateHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.stateEpoch(),
+                        partitionData.leaderEpoch(),
+                        partitionData.startOffset(),
+                        partitionData.stateBatches(),
+                        future, null)
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.WriteStateHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        return combinedFuture.thenApply(v -> {
+            List<TopicData<PartitionErrorData>> topicsData = 
futureMap.keySet().stream()
+                .map(topicId -> {
+                    List<PartitionErrorData> partitionErrData = 
futureMap.get(topicId).entrySet().stream()
+                        .map(partitionFuture -> {
+                            int partition = partitionFuture.getKey();
+                            CompletableFuture<WriteShareGroupStateResponse> 
future = partitionFuture.getValue();
+                            try {
+                                WriteShareGroupStateResponse partitionResponse 
= future.get();
+                                return 
partitionResponse.data().results().get(0).partitions().stream()
+                                    .map(partitionResult -> 
PartitionFactory.newPartitionErrorData(
+                                        partitionResult.partition(),
+                                        partitionResult.errorCode(),
+                                        partitionResult.errorMessage()))
+                                    .collect(Collectors.toList());
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                log.error("Unexpected exception while writing 
data to share coordinator", e);
+                                return 
Collections.singletonList(PartitionFactory.newPartitionErrorData(
+                                    partition,
+                                    Errors.UNKNOWN_SERVER_ERROR.code(),   // 
No specific public error code exists for InterruptedException / 
ExecutionException
+                                    "Error writing state to share coordinator: 
" + e.getMessage())
+                                );
+                            }
+                        })
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+                    return new TopicData<>(topicId, partitionErrData);
+                })
+                .collect(Collectors.toList());
+            return new WriteShareGroupStateResult.Builder()
+                .setTopicsData(topicsData)
+                .build();
+        });
+    }
+
+    /**
+     * Used by share-partition leaders to read share-partition state from a 
share coordinator.
+     * This is an inter-broker RPC authorized as a cluster action.
+     *
+     * @param request ReadShareGroupStateParameters
+     * @return A completable future of ReadShareGroupStateResult
+     */
+    public CompletableFuture<ReadShareGroupStateResult> 
readState(ReadShareGroupStateParameters request) throws 
IllegalArgumentException {
+        validate(request);
+        GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateResponse>>> futureMap = new HashMap<>();
+        List<PersisterStateManager.ReadStateHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<ReadShareGroupStateResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new ReadStateHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.leaderEpoch(),
+                        future,
+                        null)
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.ReadStateHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResult>
+        return combinedFuture.thenApply(v -> {

Review Comment:
   Can we move this function into a method? This will make it easier to read 
and probably easier to unit test the transformation of the read responses into 
a ReadShareGroupStateResult. Same for the write equivalent above.
   
   
   ```
   return combinedFuture.thenApply(__ -> readResponsesToResult(futureMap))
   ```
   
   Then we can unit test the logic in this block without dealing with other 
aspects of this readState method.



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -0,0 +1,965 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.persister;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates various handler classes corresponding to share
+ * state RPCs. It also holds an {@link InterBrokerSendThread} specialization
+ * which manages the sending the RPC requests over the network.
+ * This class is for the exclusive purpose of being used with {@link 
DefaultStatePersister}
+ * but can be extended for other {@link Persister} implementations as well.
+ */
+public class PersisterStateManager {

Review Comment:
   Ok, making sure I understand this class. PersisterStateManager owns a single 
SendThread which is used to send requests to the ShareCoordinator. The send 
thread has a queue of "handlers" which are the things responsible building an 
RPC and handling the response.
   
   Handlers are added to the queue in the following ways:
   * by DefaultSharePersister (the "normal" flow)
   * periodically by PersisterTimerTask
   * recursively by the handlers when the coordinator is not found
   
   We read from the queue when SendThread wakes up and has its generateRequests 
called by the network layer. We first check the queue for FIND_COORDINATORS and 
non-batchable RPCs and add those to the list of requests to send. Next we go 
through the batchable RPCs and add what we can to the list of outgoing requests.
   
   A few questions:
   
   1) What is the lifetime of the handler objects? Are they long lived, or 
scoped to a single RPC?
   2) It seems like requests may be reordered since we are putting them into a 
map. Is this okay? I think order should be maintained for a given Coordinator + 
RPC Type + group, so maybe it is.
   3) What happens to the pending requests when the coordinator changes?
   4) What is the purpose of the periodic wakeup?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to