brandboat commented on code in PR #19803:
URL: https://github.com/apache/kafka/pull/19803#discussion_r2106238196
##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -148,56 +147,53 @@ DescribeTopicPartitionsResponseData describeTopicResponse(
boolean ignoreTopicsWithExceptions);
static Cluster toCluster(String clusterId, MetadataImage image) {
- Map<Integer, List<Node>> brokerToNodes = new HashMap<>();
- image.cluster().brokers().values().stream()
- .filter(broker -> !broker.fenced())
- .forEach(broker -> brokerToNodes.put(broker.id(), broker.nodes()));
+ Map<Integer, Node> nodesById =
image.cluster().brokers().values().stream()
+ .collect(Collectors.toMap(BrokerRegistration::id, broker ->
broker.nodes().get(0)));
Review Comment:
Pardon me, I meant "replicas", not leader.
Maybe an example will help clarify things:
Let’s say we have 3 brokers — broker0, broker1, and broker2 — and a topic
called my-topic. Partition 0 of this topic has 3 replicas, one on each broker.
Now suppose broker2 unexpectedly shuts down.
Here’s what happens next:
1. DynamicTopicClusterQuotaPublisher#onMetadataUpdate gets triggered
2. That leads to MetadataCache#toCluster being called
3. In toCluster, nodesById gets constructed without the fenced broker
(broker2 is filtered out)
4. Then MetadataCache#toArray uses this nodesById, but in the TopicsImage
the PartitionRegistration (my-topic partition-0) still has replicas [0, 1, 2].
Since broker2 isn’t in nodesById, we get a NullPointerException, kaboom!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]