This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 15eb120701 Add metrics for bytes ingested and bytes dropped. (#14496)
15eb120701 is described below

commit 15eb120701607b0af509b4943d02bf430379a1d1
Author: Jack Luo <jlu...@ext.uber.com>
AuthorDate: Thu Nov 21 07:02:39 2024 +0800

    Add metrics for bytes ingested and bytes dropped. (#14496)
    
    * Add metrics for bytes ingested and bytes dropped.
    
    * Use `value.length` instead of `record.data().asByteArray().length`
---
 .../org/apache/pinot/common/metrics/ServerMeter.java  |  2 ++
 .../manager/realtime/RealtimeSegmentDataManager.java  | 16 ++++++++++++++++
 .../stream/kafka20/KafkaPartitionLevelConsumer.java   |  3 ++-
 .../stream/kafka30/KafkaPartitionLevelConsumer.java   |  3 ++-
 .../pinot/plugin/stream/kinesis/KinesisConsumer.java  |  3 ++-
 .../pinot/plugin/stream/pulsar/PulsarUtils.java       |  2 +-
 .../java/org/apache/pinot/spi/stream/RowMetadata.java |  7 +++++++
 .../pinot/spi/stream/StreamMessageMetadata.java       | 19 ++++++++++++++++---
 8 files changed, 48 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index c4f8279d5e..6a53ddc571 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -39,6 +39,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   DELETED_SEGMENT_COUNT("segments", false),
   DELETE_TABLE_FAILURES("tables", false),
   REALTIME_ROWS_CONSUMED("rows", true),
+  REALTIME_BYTES_CONSUMED("bytes", true),
+  REALTIME_BYTES_DROPPED("bytes", true),
   REALTIME_ROWS_SANITIZED("rows", true),
   REALTIME_ROWS_FETCHED("rows", false),
   REALTIME_ROWS_FILTERED("rows", false),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 2fbee173af..8336a6f369 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -259,6 +259,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private volatile int _numRowsConsumed = 0;
   private volatile int _numRowsIndexed = 0; // Can be different from 
_numRowsConsumed when metrics update is enabled.
   private volatile int _numRowsErrored = 0;
+  private volatile long _numBytesDropped = 0;
   private volatile int _consecutiveErrorCount = 0;
   private long _startTimeMs = 0;
   private final IdleTimer _idleTimer = new IdleTimer();
@@ -430,6 +431,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     removeSegmentFile();
 
     _numRowsErrored = 0;
+    _numBytesDropped = 0;
     long idlePipeSleepTimeMillis = 100;
     long idleTimeoutMillis = _streamConfig.getIdleTimeoutMillis();
     _idleTimer.init();
@@ -528,6 +530,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     if (_numRowsErrored > 0) {
       _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
       _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+      // TODO Although the metric is called real-time, updating it at this 
point is not really real-time. The choice of
+      // name is partly to avoid a more convoluted name and partly in 
anticipation of making it real-time.
+      _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_BYTES_DROPPED, _numBytesDropped);
     }
     return true;
   }
@@ -543,6 +548,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _partitionRateLimiter.throttle(messageCount);
     _serverRateLimiter.throttle(messageCount);
 
+    PinotMeter realtimeBytesIngestedMeter = null;
+    PinotMeter realtimeBytesDroppedMeter = null;
     PinotMeter realtimeRowsConsumedMeter = null;
     PinotMeter realtimeRowsDroppedMeter = null;
     PinotMeter realtimeIncompleteRowsConsumedMeter = null;
@@ -599,6 +606,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       if (nextOffset == null) {
         nextOffset = 
messageBatch.getNextStreamPartitionMsgOffsetAtIndex(index);
       }
+      int rowSizeInBytes = null == metadata ? 0 : 
metadata.getRecordSerializedSize();
       if (decodedRow.getException() != null) {
         // TODO: based on a config, decide whether the record should be 
silently dropped or stop further consumption on
         // decode error
@@ -606,12 +614,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                 realtimeRowsDroppedMeter);
         _numRowsErrored++;
+        _numBytesDropped += rowSizeInBytes;
       } else {
         try {
           _recordEnricherPipeline.run(decodedRow.getResult());
           _transformPipeline.processRow(decodedRow.getResult(), reusedResult);
         } catch (Exception e) {
           _numRowsErrored++;
+          _numBytesDropped += rowSizeInBytes;
           // when exception happens we prefer abandoning the whole batch and 
not partially indexing some rows
           reusedResult.getTransformedRows().clear();
           String errorMessage =
@@ -648,8 +658,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
                 _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
                     realtimeRowsConsumedMeter);
             
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
+
+            int recordSerializedValueLength = 
_lastRowMetadata.getRecordSerializedSize();
+            realtimeBytesIngestedMeter =
+                _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_BYTES_CONSUMED,
+                    recordSerializedValueLength, realtimeBytesIngestedMeter);
           } catch (Exception e) {
             _numRowsErrored++;
+            _numBytesDropped += rowSizeInBytes;
             String errorMessage =
                 String.format("Caught exception while indexing the record at 
offset: %s , row: %s", offset,
                     transformedRow);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index b72fe2dbb4..c1d4873abf 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -97,7 +97,8 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
     long offset = record.offset();
 
     StreamMessageMetadata.Builder builder = new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
-        .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1));
+        .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1))
+        .setSerializedValueSize(record.serializedValueSize());
     if (_config.isPopulateMetadata()) {
       Headers headers = record.headers();
       if (headers != null) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
index 70bda562cd..0003204067 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
@@ -97,7 +97,8 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
     long offset = record.offset();
 
     StreamMessageMetadata.Builder builder = new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
-        .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1));
+        .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1))
+        .setSerializedValueSize(record.serializedValueSize());
     if (_config.isPopulateMetadata()) {
       Headers headers = record.headers();
       if (headers != null) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index d90b1b61bb..de876b3071 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -151,7 +151,8 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
     KinesisPartitionGroupOffset offset = new 
KinesisPartitionGroupOffset(shardId, sequenceNumber);
     // NOTE: Use the same offset as next offset because the consumer starts 
consuming AFTER the start sequence number.
     StreamMessageMetadata.Builder builder =
-        new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setOffset(offset,
 offset);
+        new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setSerializedValueSize(value.length)
+            .setOffset(offset, offset);
     if (_config.isPopulateMetadata()) {
       
builder.setMetadata(Map.of(KinesisStreamMessageMetadata.APPRX_ARRIVAL_TIMESTAMP_KEY,
 String.valueOf(timestamp),
           KinesisStreamMessageMetadata.SEQUENCE_NUMBER_KEY, sequenceNumber));
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index e1b7b50c21..0f5ca4b858 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -84,7 +84,7 @@ public class PulsarUtils {
     MessageIdStreamOffset nextOffset = new 
MessageIdStreamOffset(getNextMessageId(messageId));
     StreamMessageMetadata.Builder builder =
         new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(recordIngestionTimeMs)
-            .setOffset(offset, nextOffset);
+            .setOffset(offset, 
nextOffset).setSerializedValueSize(message.size());
     if (config.isPopulateMetadata()) {
       Map<String, String> properties = message.getProperties();
       if (!properties.isEmpty()) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index 8c67ca71b4..459bc8cf95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -61,6 +61,13 @@ public interface RowMetadata {
     return Long.MIN_VALUE;
   }
 
+  /**
+   * @return The serialized size of the record
+   */
+  default int getRecordSerializedSize() {
+    return Integer.MIN_VALUE;
+  }
+
   /**
    * Returns the stream offset of the message.
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index e0edcb5801..152d109fbd 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 public class StreamMessageMetadata implements RowMetadata {
   private final long _recordIngestionTimeMs;
   private final long _firstStreamRecordIngestionTimeMs;
+  private final int _recordSerializedSize;
   private final StreamPartitionMsgOffset _offset;
   private final StreamPartitionMsgOffset _nextOffset;
   private final GenericRow _headers;
@@ -53,16 +54,17 @@ public class StreamMessageMetadata implements RowMetadata {
   @Deprecated
   public StreamMessageMetadata(long recordIngestionTimeMs, long 
firstStreamRecordIngestionTimeMs,
       @Nullable GenericRow headers, Map<String, String> metadata) {
-    this(recordIngestionTimeMs, firstStreamRecordIngestionTimeMs, null, null, 
headers, metadata);
+    this(recordIngestionTimeMs, firstStreamRecordIngestionTimeMs, null, null, 
Integer.MIN_VALUE, headers, metadata);
   }
 
   public StreamMessageMetadata(long recordIngestionTimeMs, long 
firstStreamRecordIngestionTimeMs,
       @Nullable StreamPartitionMsgOffset offset, @Nullable 
StreamPartitionMsgOffset nextOffset,
-      @Nullable GenericRow headers, @Nullable Map<String, String> metadata) {
+      int recordSerializedSize, @Nullable GenericRow headers, @Nullable 
Map<String, String> metadata) {
     _recordIngestionTimeMs = recordIngestionTimeMs;
     _firstStreamRecordIngestionTimeMs = firstStreamRecordIngestionTimeMs;
     _offset = offset;
     _nextOffset = nextOffset;
+    _recordSerializedSize = recordSerializedSize;
     _headers = headers;
     _metadata = metadata;
   }
@@ -77,6 +79,11 @@ public class StreamMessageMetadata implements RowMetadata {
     return _firstStreamRecordIngestionTimeMs;
   }
 
+  @Override
+  public int getRecordSerializedSize() {
+    return _recordSerializedSize;
+  }
+
   @Nullable
   @Override
   public StreamPartitionMsgOffset getOffset() {
@@ -103,6 +110,7 @@ public class StreamMessageMetadata implements RowMetadata {
 
   public static class Builder {
     private long _recordIngestionTimeMs = Long.MIN_VALUE;
+    private int _recordSerializedSize = Integer.MIN_VALUE;
     private long _firstStreamRecordIngestionTimeMs = Long.MIN_VALUE;
     private StreamPartitionMsgOffset _offset;
     private StreamPartitionMsgOffset _nextOffset;
@@ -135,9 +143,14 @@ public class StreamMessageMetadata implements RowMetadata {
       return this;
     }
 
+    public Builder setSerializedValueSize(int recordSerializedSize) {
+      _recordSerializedSize = recordSerializedSize;
+      return this;
+    }
+
     public StreamMessageMetadata build() {
       return new StreamMessageMetadata(_recordIngestionTimeMs, 
_firstStreamRecordIngestionTimeMs, _offset, _nextOffset,
-          _headers, _metadata);
+          _recordSerializedSize, _headers, _metadata);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to