mumrah commented on code in PR #17270: URL: https://github.com/apache/kafka/pull/17270#discussion_r1816970631
########## core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import scala.jdk.javaapi.CollectionConverters; +import scala.jdk.javaapi.OptionConverters; + +public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper { + private final KRaftMetadataCache metadataCache; Review Comment: Do we really need a reference to KRaftMetadataCache, or is MetadataCache sufficient? I would prefer this class to be in the share coordinator module rather than core. ########## share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The default implementation of the {@link Persister} interface which is used by the + * group coordinator and share-partition leaders to manage the durable share-partition state. + * This implementation uses inter-broker RPCs to make requests to the share coordinator + * which is responsible for persisting the share-partition state. + */ +public class DefaultStatePersister implements Persister { + private final PersisterStateManager stateManager; + + private static final Logger log = LoggerFactory.getLogger(DefaultStatePersister.class); + + public DefaultStatePersister(PersisterStateManager stateManager) { + this.stateManager = stateManager; + this.stateManager.start(); + } + + @Override + public void stop() { + try { + if (stateManager != null) { + stateManager.stop(); + } + } catch (Exception e) { + log.error("Unable to stop state manager", e); + } + } + + /** + * Used by the group coordinator to initialize the share-partition state. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request InitializeShareGroupStateParameters + * @return A completable future of InitializeShareGroupStateResult + */ + public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) throws IllegalArgumentException { + throw new RuntimeException("not implemented"); + } + + /** + * Used by share-partition leaders to write share-partition state to a share coordinator. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request WriteShareGroupStateParameters + * @return A completable future of WriteShareGroupStateResult + */ + public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) throws IllegalArgumentException { + validate(request); + GroupTopicPartitionData<PartitionStateBatchData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + + Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.WriteStateHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<WriteShareGroupStateResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new WriteStateHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.stateEpoch(), + partitionData.leaderEpoch(), + partitionData.startOffset(), + partitionData.stateBatches(), + future, null) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.WriteStateHandler::result) + .toArray(CompletableFuture[]::new)); + + return combinedFuture.thenApply(v -> { + List<TopicData<PartitionErrorData>> topicsData = futureMap.keySet().stream() + .map(topicId -> { + List<PartitionErrorData> partitionErrData = futureMap.get(topicId).entrySet().stream() + .map(partitionFuture -> { + int partition = partitionFuture.getKey(); + CompletableFuture<WriteShareGroupStateResponse> future = partitionFuture.getValue(); + try { + WriteShareGroupStateResponse partitionResponse = future.get(); + return partitionResponse.data().results().get(0).partitions().stream() + .map(partitionResult -> PartitionFactory.newPartitionErrorData( Review Comment: Here we use newPartitionErrorData, but in readState we use ``` PartitionFactory.newPartitionAllData( partition, -1, -1, Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException "Error reading state from share coordinator: " + e.getMessage(), Collections.emptyList())``` why the difference in error handling? ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -0,0 +1,965 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.server.util.InterBrokerSendThread; +import org.apache.kafka.server.util.RequestAndCompletionHandler; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class encapsulates various handler classes corresponding to share + * state RPCs. It also holds an {@link InterBrokerSendThread} specialization + * which manages the sending the RPC requests over the network. + * This class is for the exclusive purpose of being used with {@link DefaultStatePersister} + * but can be extended for other {@link Persister} implementations as well. + */ +public class PersisterStateManager { + private SendThread sender; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + public static final long REQUEST_BACKOFF_MS = 1_000L; + public static final long REQUEST_BACKOFF_MAX_MS = 30_000L; + private static final int MAX_FIND_COORD_ATTEMPTS = 5; + private final Time time; + private final Timer timer; + private final ShareCoordinatorMetadataCacheHelper cacheHelper; + // holds the set of share coord nodes for each RPC type which is currently sent but not completed + private final Map<RPCType, Set<Node>> inFlight = new HashMap<>(); + + // Mapping for batchable RPCs. The top level grouping is based on destination share coordinator node. + // Since kafkaApis for each RPC type are separate, we cannot batch different types of RPCs. Hence, we need + // RPCType'd key inner map. + // The RPC schemas defined in kip-932 have a single group id per request. Hence, we cannot batch RPCs + // with different groupIds and therefore, another inner map keyed on groupId is needed. + // Finally, the value is a list of handlers + private final Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>(); + + // Final object to serve synchronization needs. + private final Object nodeMapLock = new Object(); + + // Called when the generateRequests method is executed by InterBrokerSendThread, returning requests. + // Mainly for testing and introspection purpose to inspect the state of the nodeRPC map + // when generateRequests is called. + private Runnable generateCallback; + + private static class BackoffManager { + private final int maxAttempts; + private int attempts; + private final ExponentialBackoff backoff; + + BackoffManager(int maxAttempts, long initialBackoffMs, long maxBackoffMs) { + this.maxAttempts = maxAttempts; + this.backoff = new ExponentialBackoff( + initialBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, + maxBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_JITTER + ); + } + + void incrementAttempt() { + attempts++; + } + + void resetAttempts() { + attempts = 0; + } + + boolean canAttempt() { + return attempts < maxAttempts; + } + + long backOff() { + return this.backoff.backoff(attempts); + } + } + + public enum RPCType { + READ, + WRITE, + DELETE, + SUMMARY, + UNKNOWN + } + + public PersisterStateManager(KafkaClient client, ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) { + if (client == null) { + throw new IllegalArgumentException("Kafkaclient must not be null."); + } + this.time = time == null ? Time.SYSTEM : time; Review Comment: If we want some default constructor things, we should use a builder pattern instead of allowing nulls. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ########## @@ -216,6 +216,16 @@ class BrokerMetadataPublisher( s"coordinator with local changes in $deltaName", t) } + try { + // Propagate the new image to the share coordinator. + if (shareCoordinator.isDefined) { Review Comment: nit: use `foreach` (the scala convention for optionals) ########## share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The default implementation of the {@link Persister} interface which is used by the + * group coordinator and share-partition leaders to manage the durable share-partition state. + * This implementation uses inter-broker RPCs to make requests to the share coordinator + * which is responsible for persisting the share-partition state. + */ +public class DefaultStatePersister implements Persister { + private final PersisterStateManager stateManager; + + private static final Logger log = LoggerFactory.getLogger(DefaultStatePersister.class); + + public DefaultStatePersister(PersisterStateManager stateManager) { + this.stateManager = stateManager; + this.stateManager.start(); + } + + @Override + public void stop() { + try { + if (stateManager != null) { + stateManager.stop(); + } + } catch (Exception e) { + log.error("Unable to stop state manager", e); + } + } + + /** + * Used by the group coordinator to initialize the share-partition state. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request InitializeShareGroupStateParameters + * @return A completable future of InitializeShareGroupStateResult + */ + public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) throws IllegalArgumentException { + throw new RuntimeException("not implemented"); + } + + /** + * Used by share-partition leaders to write share-partition state to a share coordinator. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request WriteShareGroupStateParameters + * @return A completable future of WriteShareGroupStateResult + */ + public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) throws IllegalArgumentException { + validate(request); + GroupTopicPartitionData<PartitionStateBatchData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + + Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.WriteStateHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<WriteShareGroupStateResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new WriteStateHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.stateEpoch(), + partitionData.leaderEpoch(), + partitionData.startOffset(), + partitionData.stateBatches(), + future, null) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.WriteStateHandler::result) + .toArray(CompletableFuture[]::new)); + + return combinedFuture.thenApply(v -> { + List<TopicData<PartitionErrorData>> topicsData = futureMap.keySet().stream() + .map(topicId -> { + List<PartitionErrorData> partitionErrData = futureMap.get(topicId).entrySet().stream() + .map(partitionFuture -> { + int partition = partitionFuture.getKey(); + CompletableFuture<WriteShareGroupStateResponse> future = partitionFuture.getValue(); + try { + WriteShareGroupStateResponse partitionResponse = future.get(); + return partitionResponse.data().results().get(0).partitions().stream() Review Comment: Are we sure `results()` is always populated? Is the `get(0)` call safe? Also why do we only care about the first result's partitions? Does every result have the same set of partitions? ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -0,0 +1,965 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.server.util.InterBrokerSendThread; +import org.apache.kafka.server.util.RequestAndCompletionHandler; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class encapsulates various handler classes corresponding to share + * state RPCs. It also holds an {@link InterBrokerSendThread} specialization + * which manages the sending the RPC requests over the network. + * This class is for the exclusive purpose of being used with {@link DefaultStatePersister} + * but can be extended for other {@link Persister} implementations as well. + */ +public class PersisterStateManager { + private SendThread sender; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + public static final long REQUEST_BACKOFF_MS = 1_000L; + public static final long REQUEST_BACKOFF_MAX_MS = 30_000L; + private static final int MAX_FIND_COORD_ATTEMPTS = 5; + private final Time time; + private final Timer timer; + private final ShareCoordinatorMetadataCacheHelper cacheHelper; + // holds the set of share coord nodes for each RPC type which is currently sent but not completed + private final Map<RPCType, Set<Node>> inFlight = new HashMap<>(); + + // Mapping for batchable RPCs. The top level grouping is based on destination share coordinator node. + // Since kafkaApis for each RPC type are separate, we cannot batch different types of RPCs. Hence, we need + // RPCType'd key inner map. + // The RPC schemas defined in kip-932 have a single group id per request. Hence, we cannot batch RPCs + // with different groupIds and therefore, another inner map keyed on groupId is needed. + // Finally, the value is a list of handlers + private final Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>(); + + // Final object to serve synchronization needs. + private final Object nodeMapLock = new Object(); + + // Called when the generateRequests method is executed by InterBrokerSendThread, returning requests. + // Mainly for testing and introspection purpose to inspect the state of the nodeRPC map + // when generateRequests is called. + private Runnable generateCallback; + + private static class BackoffManager { + private final int maxAttempts; + private int attempts; + private final ExponentialBackoff backoff; + + BackoffManager(int maxAttempts, long initialBackoffMs, long maxBackoffMs) { + this.maxAttempts = maxAttempts; + this.backoff = new ExponentialBackoff( + initialBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, + maxBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_JITTER + ); + } + + void incrementAttempt() { + attempts++; + } + + void resetAttempts() { + attempts = 0; + } + + boolean canAttempt() { + return attempts < maxAttempts; + } + + long backOff() { + return this.backoff.backoff(attempts); + } + } + + public enum RPCType { + READ, + WRITE, + DELETE, + SUMMARY, + UNKNOWN + } + + public PersisterStateManager(KafkaClient client, ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) { + if (client == null) { + throw new IllegalArgumentException("Kafkaclient must not be null."); + } + this.time = time == null ? Time.SYSTEM : time; + this.sender = new SendThread( + "PersisterStateManager", + client, + 30_000, //30 seconds + this.time, + true, + new Random(this.time.milliseconds())); + this.timer = timer; + this.cacheHelper = cacheHelper; + } + + public void enqueue(PersisterStateManagerHandler handler) { + this.sender.enqueue(handler); + } + + public void start() { + if (isStarted.compareAndSet(false, true)) { + this.sender.start(); + isStarted.set(true); + } + } + + public void stop() throws Exception { + if (isStarted.compareAndSet(true, false)) { + this.sender.shutdown(); + Utils.closeQuietly(this.timer, "PersisterStateManager timer"); + } + } + + // test visibility + Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap() { + return nodeRPCMap; + } + + public void setGenerateCallback(Runnable generateCallback) { + this.generateCallback = generateCallback; + } + + /** + * Parent class of all RPCs. Uses template pattern to implement core methods. + * Various child classes can extend this class to define how to handle RPC specific + * responses, retries, batching etc. + * <p> + * Since the find coordinator RPC/lookup is a necessary pre-condition for all + * share state RPCs, the infra code for it is encapsulated in this class itself. + */ + public abstract class PersisterStateManagerHandler implements RequestCompletionHandler { + protected Node coordinatorNode; + protected final String groupId; + protected final Uuid topicId; + protected final int partition; + private final BackoffManager findCoordBackoff; + protected final Logger log = LoggerFactory.getLogger(getClass()); + private Consumer<ClientResponse> onCompleteCallback; + + public PersisterStateManagerHandler( + String groupId, + Uuid topicId, + int partition, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts + ) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); Review Comment: A few fields are not initialized here (implicit null). Can we explicitly initialize them here? ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -0,0 +1,965 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.server.util.InterBrokerSendThread; +import org.apache.kafka.server.util.RequestAndCompletionHandler; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class encapsulates various handler classes corresponding to share + * state RPCs. It also holds an {@link InterBrokerSendThread} specialization + * which manages the sending the RPC requests over the network. + * This class is for the exclusive purpose of being used with {@link DefaultStatePersister} + * but can be extended for other {@link Persister} implementations as well. + */ +public class PersisterStateManager { + private SendThread sender; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + public static final long REQUEST_BACKOFF_MS = 1_000L; + public static final long REQUEST_BACKOFF_MAX_MS = 30_000L; + private static final int MAX_FIND_COORD_ATTEMPTS = 5; + private final Time time; + private final Timer timer; + private final ShareCoordinatorMetadataCacheHelper cacheHelper; + // holds the set of share coord nodes for each RPC type which is currently sent but not completed + private final Map<RPCType, Set<Node>> inFlight = new HashMap<>(); + + // Mapping for batchable RPCs. The top level grouping is based on destination share coordinator node. + // Since kafkaApis for each RPC type are separate, we cannot batch different types of RPCs. Hence, we need + // RPCType'd key inner map. + // The RPC schemas defined in kip-932 have a single group id per request. Hence, we cannot batch RPCs + // with different groupIds and therefore, another inner map keyed on groupId is needed. + // Finally, the value is a list of handlers + private final Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap<>(); + + // Final object to serve synchronization needs. + private final Object nodeMapLock = new Object(); + + // Called when the generateRequests method is executed by InterBrokerSendThread, returning requests. + // Mainly for testing and introspection purpose to inspect the state of the nodeRPC map + // when generateRequests is called. + private Runnable generateCallback; + + private static class BackoffManager { + private final int maxAttempts; + private int attempts; + private final ExponentialBackoff backoff; + + BackoffManager(int maxAttempts, long initialBackoffMs, long maxBackoffMs) { + this.maxAttempts = maxAttempts; + this.backoff = new ExponentialBackoff( + initialBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, + maxBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_JITTER + ); + } + + void incrementAttempt() { + attempts++; + } + + void resetAttempts() { + attempts = 0; + } + + boolean canAttempt() { + return attempts < maxAttempts; + } + + long backOff() { + return this.backoff.backoff(attempts); + } + } + + public enum RPCType { + READ, + WRITE, + DELETE, + SUMMARY, + UNKNOWN + } + + public PersisterStateManager(KafkaClient client, ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) { + if (client == null) { + throw new IllegalArgumentException("Kafkaclient must not be null."); + } + this.time = time == null ? Time.SYSTEM : time; + this.sender = new SendThread( + "PersisterStateManager", + client, + 30_000, //30 seconds Review Comment: Hmm, should this be configured? ########## share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The default implementation of the {@link Persister} interface which is used by the + * group coordinator and share-partition leaders to manage the durable share-partition state. + * This implementation uses inter-broker RPCs to make requests to the share coordinator + * which is responsible for persisting the share-partition state. + */ +public class DefaultStatePersister implements Persister { + private final PersisterStateManager stateManager; + + private static final Logger log = LoggerFactory.getLogger(DefaultStatePersister.class); + + public DefaultStatePersister(PersisterStateManager stateManager) { + this.stateManager = stateManager; + this.stateManager.start(); + } + + @Override + public void stop() { + try { + if (stateManager != null) { + stateManager.stop(); + } + } catch (Exception e) { + log.error("Unable to stop state manager", e); + } + } + + /** + * Used by the group coordinator to initialize the share-partition state. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request InitializeShareGroupStateParameters + * @return A completable future of InitializeShareGroupStateResult + */ + public CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) throws IllegalArgumentException { + throw new RuntimeException("not implemented"); + } + + /** + * Used by share-partition leaders to write share-partition state to a share coordinator. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request WriteShareGroupStateParameters + * @return A completable future of WriteShareGroupStateResult + */ + public CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) throws IllegalArgumentException { + validate(request); + GroupTopicPartitionData<PartitionStateBatchData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + + Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.WriteStateHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<WriteShareGroupStateResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new WriteStateHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.stateEpoch(), + partitionData.leaderEpoch(), + partitionData.startOffset(), + partitionData.stateBatches(), + future, null) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.WriteStateHandler::result) + .toArray(CompletableFuture[]::new)); + + return combinedFuture.thenApply(v -> { + List<TopicData<PartitionErrorData>> topicsData = futureMap.keySet().stream() + .map(topicId -> { + List<PartitionErrorData> partitionErrData = futureMap.get(topicId).entrySet().stream() + .map(partitionFuture -> { + int partition = partitionFuture.getKey(); + CompletableFuture<WriteShareGroupStateResponse> future = partitionFuture.getValue(); + try { + WriteShareGroupStateResponse partitionResponse = future.get(); + return partitionResponse.data().results().get(0).partitions().stream() + .map(partitionResult -> PartitionFactory.newPartitionErrorData( + partitionResult.partition(), + partitionResult.errorCode(), + partitionResult.errorMessage())) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + log.error("Unexpected exception while writing data to share coordinator", e); + return Collections.singletonList(PartitionFactory.newPartitionErrorData( + partition, + Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException + "Error writing state to share coordinator: " + e.getMessage()) + ); + } + }) + .flatMap(List::stream) + .collect(Collectors.toList()); + return new TopicData<>(topicId, partitionErrData); + }) + .collect(Collectors.toList()); + return new WriteShareGroupStateResult.Builder() + .setTopicsData(topicsData) + .build(); + }); + } + + /** + * Used by share-partition leaders to read share-partition state from a share coordinator. + * This is an inter-broker RPC authorized as a cluster action. + * + * @param request ReadShareGroupStateParameters + * @return A completable future of ReadShareGroupStateResult + */ + public CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request) throws IllegalArgumentException { + validate(request); + GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.ReadStateHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<ReadShareGroupStateResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new ReadStateHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.leaderEpoch(), + future, + null) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + // Combine all futures into a single CompletableFuture<Void> + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.ReadStateHandler::result) + .toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResult> + return combinedFuture.thenApply(v -> { Review Comment: Can we move this function into a method? This will make it easier to read and probably easier to unit test the transformation of the read responses into a ReadShareGroupStateResult. Same for the write equivalent above. ``` return combinedFuture.thenApply(__ -> readResponsesToResult(futureMap)) ``` Then we can unit test the logic in this block without dealing with other aspects of this readState method. ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -0,0 +1,965 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.WriteShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.server.util.InterBrokerSendThread; +import org.apache.kafka.server.util.RequestAndCompletionHandler; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class encapsulates various handler classes corresponding to share + * state RPCs. It also holds an {@link InterBrokerSendThread} specialization + * which manages the sending the RPC requests over the network. + * This class is for the exclusive purpose of being used with {@link DefaultStatePersister} + * but can be extended for other {@link Persister} implementations as well. + */ +public class PersisterStateManager { Review Comment: Ok, making sure I understand this class. PersisterStateManager owns a single SendThread which is used to send requests to the ShareCoordinator. The send thread has a queue of "handlers" which are the things responsible building an RPC and handling the response. Handlers are added to the queue in the following ways: * by DefaultSharePersister (the "normal" flow) * periodically by PersisterTimerTask * recursively by the handlers when the coordinator is not found We read from the queue when SendThread wakes up and has its generateRequests called by the network layer. We first check the queue for FIND_COORDINATORS and non-batchable RPCs and add those to the list of requests to send. Next we go through the batchable RPCs and add what we can to the list of outgoing requests. A few questions: 1) What is the lifetime of the handler objects? Are they long lived, or scoped to a single RPC? 2) It seems like requests may be reordered since we are putting them into a map. Is this okay? I think order should be maintained for a given Coordinator + RPC Type + group, so maybe it is. 3) What happens to the pending requests when the coordinator changes? 4) What is the purpose of the periodic wakeup? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
