dajac commented on code in PR #16882:
URL: https://github.com/apache/kafka/pull/16882#discussion_r1721626010


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##########
@@ -202,6 +248,19 @@ public void recordEventQueueTime(long durationMs) { }
     @Override
     public void recordEventQueueProcessingTime(long durationMs) { }
 
+    @Override
+    public void recordEventPurgatoryTime(long purgatoryTimeMs, long 
writeTimestamp) {
+        if (writeTimestamp != NOT_WRITTEN) {
+            // Only record the purgatory time if records were actually 
appended to the local log.
+            eventPurgatoryTimeSensor.record(purgatoryTimeMs);
+        }

Review Comment:
   Ah, I see why you pass `writeTimestamp`. I wonder if we rather let the 
caller take this decision. Otherwise, other implementation of the interface 
maybe inconsistent. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##########
@@ -53,6 +53,21 @@ public interface CoordinatorRuntimeMetrics extends 
AutoCloseable {
      */
     void recordEventQueueProcessingTime(long durationMs);
 
+    /**
+     * Record the event purgatory time.
+     *
+     * @param purgatoryTimeMs    The time the event was completed.
+     * @param writeTimestamp     The time the records were written to the log.
+     */
+    void recordEventPurgatoryTime(long purgatoryTimeMs, long writeTimestamp);

Review Comment:
   nit: Should `purgatoryTimeMs` be called `durationMs` like the others? 
Moreover, I don't really understand why we need `writeTimestamp` here. It does 
not seem necessary.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##########
@@ -42,6 +48,16 @@ public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics
      */
     public static final String NUM_PARTITIONS_METRIC_NAME = "num-partitions";
 
+    /**
+     * The event purgatory time metric name.
+     */
+    public static final String EVENT_PURGATORY_TIME_METRIC_NAME = 
"event-purgatory-time-ms";
+
+    /**
+     * The flush time metric name.
+     */
+    public static final String FLUSH_TIME_METRIC_NAME = "flush-time-ms";

Review Comment:
   nit: I wonder if we should call it `batch-flush-time-ms` to make it clear 
that it is the flush time of batches.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##########
@@ -202,6 +248,19 @@ public void recordEventQueueTime(long durationMs) { }
     @Override
     public void recordEventQueueProcessingTime(long durationMs) { }

Review Comment:
   Should we also implement `recordEventQueueTime` and 
`recordEventQueueProcessingTime`? I think that we need histogram for those too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1302,6 +1312,9 @@ public void run() {
                     if (!future.isDone()) {
                         operationTimeout = new OperationTimeout(tp, this, 
writeTimeout.toMillis());
                         timer.add(operationTimeout);
+
+                        // Only update when the records were appended to the 
local log.
+                        writeTimestamp = time.milliseconds();

Review Comment:
   What's the reasoning for only updating the metric if records were appended? 
The event may wait in the purgatory even without writing records.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -771,6 +774,7 @@ private void flushCurrentBatch() {
                     for (DeferredEvent event : currentBatch.deferredEvents) {
                         deferredEventQueue.add(offset, event);
                     }
+                    runtimeMetrics.recordFlushTime(time.milliseconds() - 
flushStartMs);

Review Comment:
   We also write in `completeTransaction`. Should we also record there?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1580,6 +1601,9 @@ public void run() {
                     if (!future.isDone()) {
                         operationTimeout = new OperationTimeout(tp, this, 
writeTimeout.toMillis());
                         timer.add(operationTimeout);
+                        
+                        // Only update when the records were appended to the 
local log.
+                        writeTimestamp = time.milliseconds();

Review Comment:
   In this patch, we always write, I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/KafkaMetricHistogram.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A compound stat providing various metrics based on an internally maintained 
{@link HdrHistogram}.
+ * It should be more performant, more precise, and much more flexible 
regarding concurrent usage
+ * compared to either the most commonly used in the Kafka codebase Yammer 
histograms or the existing
+ * Kafka Metrics alternatives like {@link Percentiles}.
+ */
+public final class KafkaMetricHistogram implements CompoundStat {
+
+    /**
+     * The number of significant digits used for the histogram.
+     */
+    public static final int NUM_SIG_FIGS = 3;
+
+    /**
+     * Max latency value.
+     */
+    public static final long MAX_LATENCY_MS = Duration.ofMinutes(1).toMillis();
+
+    /**
+     * Suffix used for the histogram's max value.
+     */
+    private static final String MAX_NAME = "max";
+
+    /**
+     * Set list of percentiles we will provide metrics for.
+     */
+    private static final Map<Double, String> PERCENTILE_NAMES =
+        Utils.mkMap(
+            Utils.mkEntry(95.0, "p95"),

Review Comment:
   I wonder if we should also have p50?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to