This is an automated email from the ASF dual-hosted git repository. lqc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 15eb120701 Add metrics for bytes ingested and bytes dropped. (#14496) 15eb120701 is described below commit 15eb120701607b0af509b4943d02bf430379a1d1 Author: Jack Luo <jlu...@ext.uber.com> AuthorDate: Thu Nov 21 07:02:39 2024 +0800 Add metrics for bytes ingested and bytes dropped. (#14496) * Add metrics for bytes ingested and bytes dropped. * Use `value.length` instead of `record.data().asByteArray().length` --- .../org/apache/pinot/common/metrics/ServerMeter.java | 2 ++ .../manager/realtime/RealtimeSegmentDataManager.java | 16 ++++++++++++++++ .../stream/kafka20/KafkaPartitionLevelConsumer.java | 3 ++- .../stream/kafka30/KafkaPartitionLevelConsumer.java | 3 ++- .../pinot/plugin/stream/kinesis/KinesisConsumer.java | 3 ++- .../pinot/plugin/stream/pulsar/PulsarUtils.java | 2 +- .../java/org/apache/pinot/spi/stream/RowMetadata.java | 7 +++++++ .../pinot/spi/stream/StreamMessageMetadata.java | 19 ++++++++++++++++--- 8 files changed, 48 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index c4f8279d5e..6a53ddc571 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -39,6 +39,8 @@ public enum ServerMeter implements AbstractMetrics.Meter { DELETED_SEGMENT_COUNT("segments", false), DELETE_TABLE_FAILURES("tables", false), REALTIME_ROWS_CONSUMED("rows", true), + REALTIME_BYTES_CONSUMED("bytes", true), + REALTIME_BYTES_DROPPED("bytes", true), REALTIME_ROWS_SANITIZED("rows", true), REALTIME_ROWS_FETCHED("rows", false), REALTIME_ROWS_FILTERED("rows", false), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 2fbee173af..8336a6f369 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -259,6 +259,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private volatile int _numRowsConsumed = 0; private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled. private volatile int _numRowsErrored = 0; + private volatile long _numBytesDropped = 0; private volatile int _consecutiveErrorCount = 0; private long _startTimeMs = 0; private final IdleTimer _idleTimer = new IdleTimer(); @@ -430,6 +431,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { removeSegmentFile(); _numRowsErrored = 0; + _numBytesDropped = 0; long idlePipeSleepTimeMillis = 100; long idleTimeoutMillis = _streamConfig.getIdleTimeoutMillis(); _idleTimer.init(); @@ -528,6 +530,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { if (_numRowsErrored > 0) { _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored); _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored); + // TODO Although the metric is called real-time, updating it at this point is not really real-time. The choice of + // name is partly to avoid a more convoluted name and partly in anticipation of making it real-time. + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_BYTES_DROPPED, _numBytesDropped); } return true; } @@ -543,6 +548,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _partitionRateLimiter.throttle(messageCount); _serverRateLimiter.throttle(messageCount); + PinotMeter realtimeBytesIngestedMeter = null; + PinotMeter realtimeBytesDroppedMeter = null; PinotMeter realtimeRowsConsumedMeter = null; PinotMeter realtimeRowsDroppedMeter = null; PinotMeter realtimeIncompleteRowsConsumedMeter = null; @@ -599,6 +606,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { if (nextOffset == null) { nextOffset = messageBatch.getNextStreamPartitionMsgOffsetAtIndex(index); } + int rowSizeInBytes = null == metadata ? 0 : metadata.getRecordSerializedSize(); if (decodedRow.getException() != null) { // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on // decode error @@ -606,12 +614,14 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, realtimeRowsDroppedMeter); _numRowsErrored++; + _numBytesDropped += rowSizeInBytes; } else { try { _recordEnricherPipeline.run(decodedRow.getResult()); _transformPipeline.processRow(decodedRow.getResult(), reusedResult); } catch (Exception e) { _numRowsErrored++; + _numBytesDropped += rowSizeInBytes; // when exception happens we prefer abandoning the whole batch and not partially indexing some rows reusedResult.getTransformedRows().clear(); String errorMessage = @@ -648,8 +658,14 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L); + + int recordSerializedValueLength = _lastRowMetadata.getRecordSerializedSize(); + realtimeBytesIngestedMeter = + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_BYTES_CONSUMED, + recordSerializedValueLength, realtimeBytesIngestedMeter); } catch (Exception e) { _numRowsErrored++; + _numBytesDropped += rowSizeInBytes; String errorMessage = String.format("Caught exception while indexing the record at offset: %s , row: %s", offset, transformedRow); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index b72fe2dbb4..c1d4873abf 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -97,7 +97,8 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa long offset = record.offset(); StreamMessageMetadata.Builder builder = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp) - .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1)); + .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1)) + .setSerializedValueSize(record.serializedValueSize()); if (_config.isPopulateMetadata()) { Headers headers = record.headers(); if (headers != null) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java index 70bda562cd..0003204067 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java @@ -97,7 +97,8 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa long offset = record.offset(); StreamMessageMetadata.Builder builder = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp) - .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1)); + .setOffset(new LongMsgOffset(offset), new LongMsgOffset(offset + 1)) + .setSerializedValueSize(record.serializedValueSize()); if (_config.isPopulateMetadata()) { Headers headers = record.headers(); if (headers != null) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index d90b1b61bb..de876b3071 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -151,7 +151,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti KinesisPartitionGroupOffset offset = new KinesisPartitionGroupOffset(shardId, sequenceNumber); // NOTE: Use the same offset as next offset because the consumer starts consuming AFTER the start sequence number. StreamMessageMetadata.Builder builder = - new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setOffset(offset, offset); + new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setSerializedValueSize(value.length) + .setOffset(offset, offset); if (_config.isPopulateMetadata()) { builder.setMetadata(Map.of(KinesisStreamMessageMetadata.APPRX_ARRIVAL_TIMESTAMP_KEY, String.valueOf(timestamp), KinesisStreamMessageMetadata.SEQUENCE_NUMBER_KEY, sequenceNumber)); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java index e1b7b50c21..0f5ca4b858 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java @@ -84,7 +84,7 @@ public class PulsarUtils { MessageIdStreamOffset nextOffset = new MessageIdStreamOffset(getNextMessageId(messageId)); StreamMessageMetadata.Builder builder = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(recordIngestionTimeMs) - .setOffset(offset, nextOffset); + .setOffset(offset, nextOffset).setSerializedValueSize(message.size()); if (config.isPopulateMetadata()) { Map<String, String> properties = message.getProperties(); if (!properties.isEmpty()) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java index 8c67ca71b4..459bc8cf95 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java @@ -61,6 +61,13 @@ public interface RowMetadata { return Long.MIN_VALUE; } + /** + * @return The serialized size of the record + */ + default int getRecordSerializedSize() { + return Integer.MIN_VALUE; + } + /** * Returns the stream offset of the message. */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java index e0edcb5801..152d109fbd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; public class StreamMessageMetadata implements RowMetadata { private final long _recordIngestionTimeMs; private final long _firstStreamRecordIngestionTimeMs; + private final int _recordSerializedSize; private final StreamPartitionMsgOffset _offset; private final StreamPartitionMsgOffset _nextOffset; private final GenericRow _headers; @@ -53,16 +54,17 @@ public class StreamMessageMetadata implements RowMetadata { @Deprecated public StreamMessageMetadata(long recordIngestionTimeMs, long firstStreamRecordIngestionTimeMs, @Nullable GenericRow headers, Map<String, String> metadata) { - this(recordIngestionTimeMs, firstStreamRecordIngestionTimeMs, null, null, headers, metadata); + this(recordIngestionTimeMs, firstStreamRecordIngestionTimeMs, null, null, Integer.MIN_VALUE, headers, metadata); } public StreamMessageMetadata(long recordIngestionTimeMs, long firstStreamRecordIngestionTimeMs, @Nullable StreamPartitionMsgOffset offset, @Nullable StreamPartitionMsgOffset nextOffset, - @Nullable GenericRow headers, @Nullable Map<String, String> metadata) { + int recordSerializedSize, @Nullable GenericRow headers, @Nullable Map<String, String> metadata) { _recordIngestionTimeMs = recordIngestionTimeMs; _firstStreamRecordIngestionTimeMs = firstStreamRecordIngestionTimeMs; _offset = offset; _nextOffset = nextOffset; + _recordSerializedSize = recordSerializedSize; _headers = headers; _metadata = metadata; } @@ -77,6 +79,11 @@ public class StreamMessageMetadata implements RowMetadata { return _firstStreamRecordIngestionTimeMs; } + @Override + public int getRecordSerializedSize() { + return _recordSerializedSize; + } + @Nullable @Override public StreamPartitionMsgOffset getOffset() { @@ -103,6 +110,7 @@ public class StreamMessageMetadata implements RowMetadata { public static class Builder { private long _recordIngestionTimeMs = Long.MIN_VALUE; + private int _recordSerializedSize = Integer.MIN_VALUE; private long _firstStreamRecordIngestionTimeMs = Long.MIN_VALUE; private StreamPartitionMsgOffset _offset; private StreamPartitionMsgOffset _nextOffset; @@ -135,9 +143,14 @@ public class StreamMessageMetadata implements RowMetadata { return this; } + public Builder setSerializedValueSize(int recordSerializedSize) { + _recordSerializedSize = recordSerializedSize; + return this; + } + public StreamMessageMetadata build() { return new StreamMessageMetadata(_recordIngestionTimeMs, _firstStreamRecordIngestionTimeMs, _offset, _nextOffset, - _headers, _metadata); + _recordSerializedSize, _headers, _metadata); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org