jsancio commented on code in PR #18304:
URL: https://github.com/apache/kafka/pull/18304#discussion_r1902143080
##########
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java:
##########
@@ -17,7 +17,7 @@
package org.apache.kafka.controller.metrics;
-import org.apache.kafka.raft.KafkaRaftClient;
+import org.apache.kafka.raft.internals.ExternalKRaftMetricIgnoredStaticVoters;
Review Comment:
The `internals` package should not be imported outside the `raft` module.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##########
@@ -184,6 +141,59 @@ public KafkaRaftMetrics(Metrics metrics, String
metricGrpPrefix, QuorumState sta
);
}
+ public void initialize(QuorumState state) {
Review Comment:
Let's file a Jira to remove this refactor and assign it to me.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetrics.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
Review Comment:
You can also move this "composite" metrics to `core`. If you do that then
you can specify the broker server metrics and controller server metrics
concretely and remove the need for them to "implement"
`ExternalKRaftMetricIgnoredStaticVoters`.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetricIgnoredStaticVoters.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
Review Comment:
This is part of `raft` public interface. Move it to the
`org.apache.kafka.raft` package.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -87,14 +91,23 @@ public KRaftControlRecordStateMachine(
RecordSerde<?> serde,
BufferSupplier bufferSupplier,
int maxBatchSizeBytes,
- LogContext logContext
+ LogContext logContext,
+ KafkaRaftMetrics kafkaRaftMetrics,
+ ExternalKRaftMetrics externalKRaftMetrics
) {
this.log = log;
this.voterSetHistory = new VoterSetHistory(staticVoterSet);
this.serde = serde;
this.bufferSupplier = bufferSupplier;
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.logger = logContext.logger(this.getClass());
+ this.kafkaRaftMetrics = kafkaRaftMetrics;
+ this.externalKRaftMetrics = externalKRaftMetrics;
+ this.staticVoterSet = staticVoterSet != null ?
Optional.of(staticVoterSet) : Optional.empty();
+
+ if (staticVoterSet != null) {
Review Comment:
Why are you checking for `null`? If you follow all code paths
`staticVoterSet` is never `null`. Did you mean to use
`staticVoterSet.ifPresent(voters ->
kafkaRaftMetrics.updateNumVoters(voters.size())`?
##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -87,14 +91,23 @@ public KRaftControlRecordStateMachine(
RecordSerde<?> serde,
BufferSupplier bufferSupplier,
int maxBatchSizeBytes,
- LogContext logContext
+ LogContext logContext,
+ KafkaRaftMetrics kafkaRaftMetrics,
+ ExternalKRaftMetrics externalKRaftMetrics
) {
this.log = log;
this.voterSetHistory = new VoterSetHistory(staticVoterSet);
this.serde = serde;
this.bufferSupplier = bufferSupplier;
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.logger = logContext.logger(this.getClass());
+ this.kafkaRaftMetrics = kafkaRaftMetrics;
+ this.externalKRaftMetrics = externalKRaftMetrics;
+ this.staticVoterSet = staticVoterSet != null ?
Optional.of(staticVoterSet) : Optional.empty();
Review Comment:
You can reuse `Optional.ofNullable`.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -682,13 +690,15 @@ private void transitionToCandidate(long currentTimeMs) {
private void transitionToUnattached(int epoch) {
quorum.transitionToUnattached(epoch);
maybeFireLeaderChange();
+ kafkaRaftMetrics.removeLeaderMetrics();
resetConnections();
}
private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request
since this node is resigning"));
quorum.transitionToResigned(preferredSuccessors);
+ kafkaRaftMetrics.removeLeaderMetrics();
Review Comment:
Okay. Need to check that these are all of the possible transitions from the
leader state.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetrics.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+public class ExternalKRaftMetrics {
Review Comment:
If you move this to `core` and use this name for the interface, then you can
call this type something like `DefaultExternalKRaftMetrics`.
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -217,6 +221,12 @@ public void resetAddVoterHandlerState(
.complete(RaftUtil.addVoterResponse(error, message))
);
addVoterHandlerState = state;
+ if (addVoterHandlerState.isPresent() ||
removeVoterHandlerState.isPresent()) {
+ kafkaRaftMetrics.updateUncommittedVoterChange(1);
+ } else {
+ kafkaRaftMetrics.updateUncommittedVoterChange(0);
+ }
Review Comment:
Code duplication. Let's reuse this code by moving this to a private method.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##########
@@ -208,13 +230,35 @@ public void updateElectionStartMs(long currentTimeMs) {
electionStartMs = OptionalLong.of(currentTimeMs);
}
+ public void updateNumVoters(int numVoters) {
+ this.numVoters = numVoters;
+ }
+
+ public void updateNumObservers(int numObservers) {
+ this.numObservers = numObservers;
+ }
+
+ public void updateUncommittedVoterChange(int uncommittedVoterChange) {
+ this.uncommittedVoterChange = uncommittedVoterChange;
+ }
+
public void maybeUpdateElectionLatency(long currentTimeMs) {
if (electionStartMs.isPresent()) {
electionTimeSensor.record(currentTimeMs -
electionStartMs.getAsLong(), currentTimeMs);
electionStartMs = OptionalLong.empty();
}
}
+ public void addLeaderMetrics() {
+ metrics.addMetric(numObserversMetricName, (Gauge<Integer>) (config,
now) -> numObservers);
+ metrics.addMetric(uncommittedVoterChangeMetricName, (Gauge<Integer>)
(config, now) -> uncommittedVoterChange);
+ }
+
+ public void removeLeaderMetrics() {
+ metrics.removeMetric(numObserversMetricName);
+ metrics.removeMetric(uncommittedVoterChangeMetricName);
+ }
Review Comment:
Don't you need to reset the value of those metrics after losing leadership?
##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -430,7 +431,8 @@ public RaftClientTestContext build() throws IOException {
client.initialize(
staticVoterAddressMap,
quorumStateStore,
- metrics
+ metrics,
+ new ExternalKRaftMetrics(null, null)
Review Comment:
Where are the test for this metrics like you did for the new metrics in
`KafkaRaftMetrics`?
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##########
@@ -208,13 +230,35 @@ public void updateElectionStartMs(long currentTimeMs) {
electionStartMs = OptionalLong.of(currentTimeMs);
}
+ public void updateNumVoters(int numVoters) {
+ this.numVoters = numVoters;
+ }
+
+ public void updateNumObservers(int numObservers) {
+ this.numObservers = numObservers;
+ }
+
+ public void updateUncommittedVoterChange(int uncommittedVoterChange) {
+ this.uncommittedVoterChange = uncommittedVoterChange;
+ }
+
public void maybeUpdateElectionLatency(long currentTimeMs) {
if (electionStartMs.isPresent()) {
electionTimeSensor.record(currentTimeMs -
electionStartMs.getAsLong(), currentTimeMs);
electionStartMs = OptionalLong.empty();
}
}
+ public void addLeaderMetrics() {
+ metrics.addMetric(numObserversMetricName, (Gauge<Integer>) (config,
now) -> numObservers);
+ metrics.addMetric(uncommittedVoterChangeMetricName, (Gauge<Integer>)
(config, now) -> uncommittedVoterChange);
Review Comment:
Do you need the explicit cast to `Gauge<Integer>` in both of this cases? If
so, why?
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -234,6 +244,11 @@ public void resetRemoveVoterHandlerState(
.complete(RaftUtil.removeVoterResponse(error, message))
);
removeVoterHandlerState = state;
+ if (addVoterHandlerState.isPresent() ||
removeVoterHandlerState.isPresent()) {
+ kafkaRaftMetrics.updateUncommittedVoterChange(1);
+ } else {
+ kafkaRaftMetrics.updateUncommittedVoterChange(0);
+ }
Review Comment:
How about
```java
kafkaRaftMetrics.updateUncommittedVoterChange(
addVoterHandlerState.isPresent() ||
removeVoterHandlerState.isPresent()
);
```
##########
server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java:
##########
@@ -17,19 +17,22 @@
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.image.MetadataProvenance;
import com.yammer.metrics.core.Histogram;
+import org.apache.kafka.raft.internals.ExternalKRaftMetricIgnoredStaticVoters;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-public final class BrokerServerMetrics implements AutoCloseable {
+public final class BrokerServerMetrics implements AutoCloseable,
ExternalKRaftMetricIgnoredStaticVoters {
Review Comment:
See my other comments for a suggestion on how to remove this implementation.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetrics.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+public class ExternalKRaftMetrics {
Review Comment:
Let's add a unitest suite for this type. E.g. ExternalKRaftMetricsTest
##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##########
@@ -92,7 +92,8 @@ private QuorumState buildQuorumState(VoterSet voterSet,
KRaftVersion kraftVersio
new MockQuorumStateStore(),
time,
new LogContext("kafka-raft-metrics-test"),
- random
+ random,
+ raftMetrics
Review Comment:
Don't you need test for the new functionality you added?
##########
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java:
##########
@@ -243,6 +257,13 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}
+ public void switchIgnoredStaticVoters() {
+ ignoredStaticVoters.compareAndSet(false, true);
+ }
+ public boolean ignoredStaticVoters() {
+ return this.ignoredStaticVoters.get();
Review Comment:
The `this` keyword is not needed.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetrics.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+public class ExternalKRaftMetrics {
+ private final Optional<ExternalKRaftMetricIgnoredStaticVoters>
brokerServerMetrics;
+ private final Optional<ExternalKRaftMetricIgnoredStaticVoters>
controllerMetadataMetrics;
+
+ public ExternalKRaftMetrics(ExternalKRaftMetricIgnoredStaticVoters
brokerServerMetrics, ExternalKRaftMetricIgnoredStaticVoters
controllerMetadataMetrics) {
+ this.brokerServerMetrics = brokerServerMetrics != null ?
Optional.of(brokerServerMetrics) : Optional.empty();
+ this.controllerMetadataMetrics = controllerMetadataMetrics != null ?
Optional.of(controllerMetadataMetrics) : Optional.empty();
Review Comment:
You can reuse `Optional.ofNullable`. It does the same thing this expression
does.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -281,6 +294,11 @@ private void handleBatch(Batch<?> batch, OptionalLong
overrideOffset) {
switch (record.type()) {
case KRAFT_VOTERS:
VoterSet voters = VoterSet.fromVotersRecord((VotersRecord)
record.message());
+ kafkaRaftMetrics.updateNumVoters(voters.size());
+ if (staticVoterSet.isPresent() && !ignoredStaticVoterSet) {
+ ignoredStaticVoterSet = true;
Review Comment:
I wouldn't bother with this `ignoredStaticVoterSet` optimization. Voters are
not changed very often and an atomic or volatile set it not that slow.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetricIgnoredStaticVoters.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+public interface ExternalKRaftMetricIgnoredStaticVoters {
Review Comment:
I suggest naming this `ExternalKRaftMetrics`.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetricIgnoredStaticVoters.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+public interface ExternalKRaftMetricIgnoredStaticVoters {
Review Comment:
Let's write Java docs for the type and method. It is important to state that
implementations of this object are no allowed to block.
##########
raft/src/main/java/org/apache/kafka/raft/internals/ExternalKRaftMetricIgnoredStaticVoters.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+public interface ExternalKRaftMetricIgnoredStaticVoters {
+ void switchIgnoredStaticVoters();
Review Comment:
I know that I suggested using the "switch" prefix but I am starting to think
that it is not fully accurate. Maybe the prefix "set" is the best option.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##########
@@ -208,13 +230,35 @@ public void updateElectionStartMs(long currentTimeMs) {
electionStartMs = OptionalLong.of(currentTimeMs);
}
+ public void updateNumVoters(int numVoters) {
+ this.numVoters = numVoters;
+ }
+
+ public void updateNumObservers(int numObservers) {
+ this.numObservers = numObservers;
+ }
+
+ public void updateUncommittedVoterChange(int uncommittedVoterChange) {
+ this.uncommittedVoterChange = uncommittedVoterChange;
+ }
+
public void maybeUpdateElectionLatency(long currentTimeMs) {
if (electionStartMs.isPresent()) {
electionTimeSensor.record(currentTimeMs -
electionStartMs.getAsLong(), currentTimeMs);
electionStartMs = OptionalLong.empty();
}
}
+ public void addLeaderMetrics() {
+ metrics.addMetric(numObserversMetricName, (Gauge<Integer>) (config,
now) -> numObservers);
+ metrics.addMetric(uncommittedVoterChangeMetricName, (Gauge<Integer>)
(config, now) -> uncommittedVoterChange);
+ }
+
+ public void removeLeaderMetrics() {
+ metrics.removeMetric(numObserversMetricName);
+ metrics.removeMetric(uncommittedVoterChangeMetricName);
+ }
+
@Override
public void close() {
Review Comment:
Did you test what happens if `close` gets called before `initialize`?
##########
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java:
##########
@@ -243,6 +257,13 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}
+ public void switchIgnoredStaticVoters() {
+ ignoredStaticVoters.compareAndSet(false, true);
Review Comment:
No need to compare and set. This is the same as just setting to true.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##########
@@ -42,6 +42,9 @@ public class KafkaRaftMetrics implements AutoCloseable {
private volatile int numUnknownVoterConnections;
private volatile OptionalLong electionStartMs;
private volatile OptionalLong pollStartMs;
+ private volatile int numVoters;
+ private volatile int numObservers;
+ private volatile int uncommittedVoterChange;
Review Comment:
Let's make this a boolean.
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -632,7 +647,10 @@ public long epochStartOffset() {
private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
- observerStates.putIfAbsent(replicaKey, new
ReplicaState(replicaKey, false, Endpoints.empty()));
+ LeaderState.ReplicaState previous =
observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false,
Endpoints.empty()));
+ if (previous == null) {
+ kafkaRaftMetrics.updateNumObservers(observerStates.size());
+ }
Review Comment:
Hmm. I am thinking that this optimization is not necessary at the moment.
This applies to rest of the changes in this file.
##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -17,9 +17,11 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.Metrics;
Review Comment:
Do we need any unittest test for the metric values?
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -3842,6 +3842,7 @@ public void testMetrics(boolean withKip853Rpc) throws
Exception {
assertNotNull(getMetric(context.metrics, "election-latency-max"));
assertNotNull(getMetric(context.metrics, "fetch-records-rate"));
assertNotNull(getMetric(context.metrics, "append-records-rate"));
+ assertNotNull(getMetric(context.metrics, "number-of-voters"));
Review Comment:
We should add a test for the leader specific metric the replica transition
from leader to follower to leader. We need to change that after those
transition the metrics are reporting the correct value and not the previous
value.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -3850,6 +3851,8 @@ public void testMetrics(boolean withKip853Rpc) throws
Exception {
assertEquals((double) 1L, getMetric(context.metrics,
"high-watermark").metricValue());
assertEquals((double) 1L, getMetric(context.metrics,
"log-end-offset").metricValue());
assertEquals((double) epoch, getMetric(context.metrics,
"log-end-epoch").metricValue());
+ assertNotNull(getMetric(context.metrics, "number-of-observers"));
+ assertNotNull(getMetric(context.metrics, "uncommitted-voter-change"));
Review Comment:
Where it the test for not leader?
##########
raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java:
##########
@@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.metrics.Metrics;
Review Comment:
Where are the unittest for the metric values?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]