AndrewJSchofield commented on code in PR #16488:
URL: https://github.com/apache/kafka/pull/16488#discussion_r1666522093
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams,
String groupId, String m
this.partitionMaxBytes = partitionMaxBytes;
}
}
+
+ static class ShareGroupMetrics {
+ /**
+ * share-acknowledgement (share-acknowledgement-rate and
share-acknowledgement-count) - The total number of offsets acknowledged for
share groups (requests to be ack).
+ * record-acknowledgement (record-acknowledgement-rate and
record-acknowledgement-count) - The number of records acknowledged per
acknowledgement type.
+ * partition-load-time (partition-load-time-avg and
partition-load-time-max) - The time taken to load the share partitions.
+ */
+
+ public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+ public static final String SHARE_ACK_SENSOR =
"share-acknowledgement-sensor";
+ public static final String SHARE_ACK_RATE =
"share-acknowledgement-rate";
+ public static final String SHARE_ACK_COUNT =
"share-acknowledgement-count";
+
+ public static final String RECORD_ACK_SENSOR_PREFIX =
"record-acknowledgement";
+ public static final String RECORD_ACK_RATE =
"record-acknowledgement-rate";
+ public static final String RECORD_ACK_COUNT =
"record-acknowledgement-count";
+ public static final String ACK_TYPE = "ack-type";
+
+ public static final String PARTITION_LOAD_TIME_SENSOR =
"partition-load-time-sensor";
+ public static final String PARTITION_LOAD_TIME_AVG =
"partition-load-time-avg";
+ public static final String PARTITION_LOAD_TIME_MAX =
"partition-load-time-max";
+
+ public static final Map<Byte, String> RECORD_ACKS_MAP = new
HashMap<>();
+
+ private final Time time;
+ private final Sensor shareAcknowledgementSensor;
+ private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+ private final Sensor partitionLoadTimeSensor;
+
+ static {
+ RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+ RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+ RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+ }
+
+ public ShareGroupMetrics(Metrics metrics, Time time) {
+ this.time = time;
+
+ shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+ shareAcknowledgementSensor.add(new Meter(
+ metrics.metricName(
+ SHARE_ACK_RATE,
+ METRICS_GROUP_NAME,
+ "The rate of number of acknowledge requests."),
+ metrics.metricName(
+ SHARE_ACK_COUNT,
+ METRICS_GROUP_NAME,
+ "The number of acknowledge requests.")));
+
+ for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
+ recordAcksSensorMap.put(entry.getKey(),
metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX,
entry.getValue())));
+ recordAcksSensorMap.get(entry.getKey())
+ .add(new Meter(
+ metrics.metricName(
+ RECORD_ACK_RATE,
+ METRICS_GROUP_NAME,
+ "The rate of number of records acknowledged per
acknowledgement type.",
+ ACK_TYPE, entry.getValue()),
+ metrics.metricName(
+ RECORD_ACK_COUNT,
+ METRICS_GROUP_NAME,
+ "The number of records acknowledged per
acknowledgement type.",
+ ACK_TYPE, entry.getValue())));
+ }
+
+ partitionLoadTimeSensor =
metrics.sensor(PARTITION_LOAD_TIME_SENSOR);
+ partitionLoadTimeSensor.add(metrics.metricName(
+ PARTITION_LOAD_TIME_AVG,
+ METRICS_GROUP_NAME,
+ "The average time in milliseconds to load the share
partitions."),
+ new Avg());
+ partitionLoadTimeSensor.add(metrics.metricName(
+ PARTITION_LOAD_TIME_MAX,
+ METRICS_GROUP_NAME,
+ "The maximum time in milliseconds to load the share
partitions."),
+ new Max());
+ }
+
+ void shareAcknowledgement() {
+ shareAcknowledgementSensor.record();
+ }
+
+ void recordAcknowledgement(byte ackType) {
+ if (recordAcksSensorMap.containsKey(ackType)) {
+ recordAcksSensorMap.get(ackType).record();
+ } else {
Review Comment:
The ackType can be zero when a gap is being acknowledged. For example, when
a batch of transactional records is received by the client, it responds with 0
for the offsets which correspond to the transactional control records. This
will cause an error to be logged by the broker for each of the records.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams,
String groupId, String m
this.partitionMaxBytes = partitionMaxBytes;
}
}
+
+ static class ShareGroupMetrics {
+ /**
+ * share-acknowledgement (share-acknowledgement-rate and
share-acknowledgement-count) - The total number of offsets acknowledged for
share groups (requests to be ack).
+ * record-acknowledgement (record-acknowledgement-rate and
record-acknowledgement-count) - The number of records acknowledged per
acknowledgement type.
+ * partition-load-time (partition-load-time-avg and
partition-load-time-max) - The time taken to load the share partitions.
+ */
+
+ public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+ public static final String SHARE_ACK_SENSOR =
"share-acknowledgement-sensor";
+ public static final String SHARE_ACK_RATE =
"share-acknowledgement-rate";
+ public static final String SHARE_ACK_COUNT =
"share-acknowledgement-count";
+
+ public static final String RECORD_ACK_SENSOR_PREFIX =
"record-acknowledgement";
+ public static final String RECORD_ACK_RATE =
"record-acknowledgement-rate";
+ public static final String RECORD_ACK_COUNT =
"record-acknowledgement-count";
+ public static final String ACK_TYPE = "ack-type";
+
+ public static final String PARTITION_LOAD_TIME_SENSOR =
"partition-load-time-sensor";
+ public static final String PARTITION_LOAD_TIME_AVG =
"partition-load-time-avg";
+ public static final String PARTITION_LOAD_TIME_MAX =
"partition-load-time-max";
+
+ public static final Map<Byte, String> RECORD_ACKS_MAP = new
HashMap<>();
+
+ private final Time time;
+ private final Sensor shareAcknowledgementSensor;
+ private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+ private final Sensor partitionLoadTimeSensor;
+
+ static {
+ RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+ RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+ RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+ }
+
+ public ShareGroupMetrics(Metrics metrics, Time time) {
+ this.time = time;
+
+ shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+ shareAcknowledgementSensor.add(new Meter(
+ metrics.metricName(
+ SHARE_ACK_RATE,
+ METRICS_GROUP_NAME,
+ "The rate of number of acknowledge requests."),
Review Comment:
"Rate of number" sounds odd. "Rate of acknowledge requests" or "acknowledge
request rate" would be better.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams,
String groupId, String m
this.partitionMaxBytes = partitionMaxBytes;
}
}
+
+ static class ShareGroupMetrics {
+ /**
+ * share-acknowledgement (share-acknowledgement-rate and
share-acknowledgement-count) - The total number of offsets acknowledged for
share groups (requests to be ack).
+ * record-acknowledgement (record-acknowledgement-rate and
record-acknowledgement-count) - The number of records acknowledged per
acknowledgement type.
+ * partition-load-time (partition-load-time-avg and
partition-load-time-max) - The time taken to load the share partitions.
+ */
+
+ public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+ public static final String SHARE_ACK_SENSOR =
"share-acknowledgement-sensor";
+ public static final String SHARE_ACK_RATE =
"share-acknowledgement-rate";
+ public static final String SHARE_ACK_COUNT =
"share-acknowledgement-count";
+
+ public static final String RECORD_ACK_SENSOR_PREFIX =
"record-acknowledgement";
+ public static final String RECORD_ACK_RATE =
"record-acknowledgement-rate";
+ public static final String RECORD_ACK_COUNT =
"record-acknowledgement-count";
+ public static final String ACK_TYPE = "ack-type";
+
+ public static final String PARTITION_LOAD_TIME_SENSOR =
"partition-load-time-sensor";
+ public static final String PARTITION_LOAD_TIME_AVG =
"partition-load-time-avg";
+ public static final String PARTITION_LOAD_TIME_MAX =
"partition-load-time-max";
+
+ public static final Map<Byte, String> RECORD_ACKS_MAP = new
HashMap<>();
+
+ private final Time time;
+ private final Sensor shareAcknowledgementSensor;
+ private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+ private final Sensor partitionLoadTimeSensor;
+
+ static {
+ RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+ RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+ RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+ }
+
+ public ShareGroupMetrics(Metrics metrics, Time time) {
+ this.time = time;
+
+ shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+ shareAcknowledgementSensor.add(new Meter(
+ metrics.metricName(
+ SHARE_ACK_RATE,
+ METRICS_GROUP_NAME,
+ "The rate of number of acknowledge requests."),
+ metrics.metricName(
+ SHARE_ACK_COUNT,
+ METRICS_GROUP_NAME,
+ "The number of acknowledge requests.")));
+
+ for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
+ recordAcksSensorMap.put(entry.getKey(),
metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX,
entry.getValue())));
+ recordAcksSensorMap.get(entry.getKey())
+ .add(new Meter(
+ metrics.metricName(
+ RECORD_ACK_RATE,
+ METRICS_GROUP_NAME,
+ "The rate of number of records acknowledged per
acknowledgement type.",
Review Comment:
"Rate of number" sounds odd. "Rate of records acknowledged per
acknowledgement type" sounds better I think.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams,
String groupId, String m
this.partitionMaxBytes = partitionMaxBytes;
}
}
+
+ static class ShareGroupMetrics {
+ /**
+ * share-acknowledgement (share-acknowledgement-rate and
share-acknowledgement-count) - The total number of offsets acknowledged for
share groups (requests to be ack).
+ * record-acknowledgement (record-acknowledgement-rate and
record-acknowledgement-count) - The number of records acknowledged per
acknowledgement type.
+ * partition-load-time (partition-load-time-avg and
partition-load-time-max) - The time taken to load the share partitions.
+ */
+
+ public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+ public static final String SHARE_ACK_SENSOR =
"share-acknowledgement-sensor";
+ public static final String SHARE_ACK_RATE =
"share-acknowledgement-rate";
+ public static final String SHARE_ACK_COUNT =
"share-acknowledgement-count";
+
+ public static final String RECORD_ACK_SENSOR_PREFIX =
"record-acknowledgement";
+ public static final String RECORD_ACK_RATE =
"record-acknowledgement-rate";
+ public static final String RECORD_ACK_COUNT =
"record-acknowledgement-count";
+ public static final String ACK_TYPE = "ack-type";
+
+ public static final String PARTITION_LOAD_TIME_SENSOR =
"partition-load-time-sensor";
+ public static final String PARTITION_LOAD_TIME_AVG =
"partition-load-time-avg";
+ public static final String PARTITION_LOAD_TIME_MAX =
"partition-load-time-max";
+
+ public static final Map<Byte, String> RECORD_ACKS_MAP = new
HashMap<>();
+
+ private final Time time;
+ private final Sensor shareAcknowledgementSensor;
+ private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+ private final Sensor partitionLoadTimeSensor;
+
+ static {
+ RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+ RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+ RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+ }
+
+ public ShareGroupMetrics(Metrics metrics, Time time) {
+ this.time = time;
+
+ shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+ shareAcknowledgementSensor.add(new Meter(
+ metrics.metricName(
+ SHARE_ACK_RATE,
+ METRICS_GROUP_NAME,
+ "The rate of number of acknowledge requests."),
Review Comment:
Probably "Rate of acknowledge requests" is best given my next comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]