AndrewJSchofield commented on code in PR #17270:
URL: https://github.com/apache/kafka/pull/17270#discussion_r1799605303
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -521,17 +525,30 @@ public void onElection(int partitionIndex, int
partitionLeaderEpoch) {
@Override
public void onResignation(int partitionIndex, OptionalInt
partitionLeaderEpoch) {
+ throwIfNotActive();
runtime.scheduleUnloadOperation(
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionIndex),
partitionLeaderEpoch
);
}
+ @Override
+ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
+ throwIfNotActive();
+ this.runtime.onNewMetadataImage(newImage, delta);
+ }
+
private TopicPartition topicPartitionFor(SharePartitionKey key) {
return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME,
partitionFor(key.toString()));
}
private static <P> boolean isEmpty(List<P> list) {
return list == null || list.isEmpty();
}
+
+ private void throwIfNotActive() {
Review Comment:
This method probably ought to be guarding the `readState` and `writeState`
methods too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]