chia7712 commented on code in PR #17015:
URL: https://github.com/apache/kafka/pull/17015#discussion_r1739962917


##########
server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network.metrics;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.protocol.ApiKeys;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class RequestChannelMetrics {
+
+    private final Map<String, RequestMetrics> metricsMap;
+
+    public RequestChannelMetrics(Set<ApiKeys> enabledApis) {
+        metricsMap = new HashMap<>();
+        for (ApiKeys apiKey : enabledApis) {
+            metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name));
+        }
+        for (String name : 
Arrays.asList(RequestMetrics.CONSUMER_FETCH_METRIC_NAME, 
RequestMetrics.FOLLOW_FETCH_METRIC_NAME, 
RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME)) {
+            metricsMap.put(name, new RequestMetrics(name));
+        }
+    }
+
+    public RequestChannelMetrics(ApiMessageType.ListenerType scope) {
+        this(ApiKeys.apisForListener(scope));
+    }
+
+    public RequestMetrics apply(String metricName) {
+        return metricsMap.get(metricName);

Review Comment:
   Could you please throw `NoSuchElementException` if you want to follow scala 
style?



##########
server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network.metrics;
+
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+public class RequestMetrics {
+
+    public static final String CONSUMER_FETCH_METRIC_NAME = ApiKeys.FETCH.name 
+ "Consumer";
+    public static final String FOLLOW_FETCH_METRIC_NAME = ApiKeys.FETCH.name + 
"Follower";
+    public static final String VERIFY_PARTITIONS_IN_TXN_METRIC_NAME = 
ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification";
+    public static final String REQUESTS_PER_SEC = "RequestsPerSec";
+    public static final String DEPRECATED_REQUESTS_PER_SEC = 
"DeprecatedRequestsPerSec";
+    public static final String MESSAGE_CONVERSIONS_TIME_MS = 
"MessageConversionsTimeMs";
+    public static final String TEMPORARY_MEMORY_BYTES = "TemporaryMemoryBytes";
+
+    private static final String REQUEST_QUEUE_TIME_MS = "RequestQueueTimeMs";
+    private static final String LOCAL_TIME_MS = "LocalTimeMs";
+    private static final String REMOTE_TIME_MS = "RemoteTimeMs";
+    private static final String THROTTLE_TIME_MS = "ThrottleTimeMs";
+    private static final String RESPONSE_QUEUE_TIME_MS = "ResponseQueueTimeMs";
+    private static final String RESPONSE_SEND_TIME_MS = "ResponseSendTimeMs";
+    private static final String TOTAL_TIME_MS = "TotalTimeMs";
+    private static final String REQUEST_BYTES = "RequestBytes";
+    private static final String ERRORS_PER_SEC = "ErrorsPerSec";
+
+    // time a request spent in a request queue
+    public final Histogram requestQueueTimeHist;

Review Comment:
   Could we add updater for those variables instead of making them public?



##########
server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network.metrics;
+
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+public class RequestMetrics {
+
+    public static final String CONSUMER_FETCH_METRIC_NAME = ApiKeys.FETCH.name 
+ "Consumer";
+    public static final String FOLLOW_FETCH_METRIC_NAME = ApiKeys.FETCH.name + 
"Follower";
+    public static final String VERIFY_PARTITIONS_IN_TXN_METRIC_NAME = 
ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification";
+    public static final String REQUESTS_PER_SEC = "RequestsPerSec";
+    public static final String DEPRECATED_REQUESTS_PER_SEC = 
"DeprecatedRequestsPerSec";
+    public static final String MESSAGE_CONVERSIONS_TIME_MS = 
"MessageConversionsTimeMs";
+    public static final String TEMPORARY_MEMORY_BYTES = "TemporaryMemoryBytes";
+
+    private static final String REQUEST_QUEUE_TIME_MS = "RequestQueueTimeMs";
+    private static final String LOCAL_TIME_MS = "LocalTimeMs";
+    private static final String REMOTE_TIME_MS = "RemoteTimeMs";
+    private static final String THROTTLE_TIME_MS = "ThrottleTimeMs";
+    private static final String RESPONSE_QUEUE_TIME_MS = "ResponseQueueTimeMs";
+    private static final String RESPONSE_SEND_TIME_MS = "ResponseSendTimeMs";
+    private static final String TOTAL_TIME_MS = "TotalTimeMs";
+    private static final String REQUEST_BYTES = "RequestBytes";
+    private static final String ERRORS_PER_SEC = "ErrorsPerSec";
+
+    // time a request spent in a request queue
+    public final Histogram requestQueueTimeHist;
+    // time a request takes to be processed at the local broker
+    public final Histogram localTimeHist;
+    // time a request takes to wait on remote brokers (currently only relevant 
to fetch and produce requests)
+    public final Histogram remoteTimeHist;
+    // time a request is throttled, not part of the request processing time 
(throttling is done at the client level
+    // for clients that support KIP-219 and by muting the channel for the rest)
+    public final Histogram throttleTimeHist;
+    // time a response spent in a response queue
+    public final Histogram responseQueueTimeHist;
+    // time to send the response to the requester
+    public final Histogram responseSendTimeHist;
+    public final Histogram totalTimeHist;
+    // request size in bytes
+    public final Histogram requestBytesHist;
+    // time for message conversions (only relevant to fetch and produce 
requests)
+    public final Optional<Histogram> messageConversionsTimeHist;
+    // Temporary memory allocated for processing request (only populated for 
fetch and produce requests)
+    // This shows the memory allocated for compression/conversions excluding 
the actual request size
+    public final Optional<Histogram> tempMemoryBytesHist;
+
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.network", "RequestMetrics");

Review Comment:
   Could you please add comments to remind readers this is for compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to