smjn commented on code in PR #17270:
URL: https://github.com/apache/kafka/pull/17270#discussion_r1799703661
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -521,17 +525,30 @@ public void onElection(int partitionIndex, int
partitionLeaderEpoch) {
@Override
public void onResignation(int partitionIndex, OptionalInt
partitionLeaderEpoch) {
+ throwIfNotActive();
runtime.scheduleUnloadOperation(
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionIndex),
partitionLeaderEpoch
);
}
+ @Override
+ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
+ throwIfNotActive();
+ this.runtime.onNewMetadataImage(newImage, delta);
+ }
+
private TopicPartition topicPartitionFor(SharePartitionKey key) {
return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionFor(key.toString()));
}
private static <P> boolean isEmpty(List<P> list) {
return list == null || list.isEmpty();
}
+
+ private void throwIfNotActive() {
Review Comment:
@AndrewJSchofield
It is - line 284, 404
just that it is not being used directly but the check is the same. We are
just wrapping the response as a completable future to easier handling at
KafkaApis level
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]