This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d7411c5d38 Fix ramping delay caused by long lasting sequence of unfiltered messa… (#10418) d7411c5d38 is described below commit d7411c5d381e0c44df09092af023f5cd113e6584 Author: Juan Gomez <jugo...@linkedin.com> AuthorDate: Fri Mar 31 22:31:28 2023 -0700 Fix ramping delay caused by long lasting sequence of unfiltered messa… (#10418) --- .../manager/realtime/IngestionDelayTracker.java | 12 ++++- .../realtime/LLRealtimeSegmentDataManager.java | 51 +++++++++++++++------- .../plugin/stream/kafka20/KafkaMessageBatch.java | 16 ++++++- .../kafka20/KafkaPartitionLevelConsumer.java | 5 ++- .../org/apache/pinot/spi/stream/MessageBatch.java | 14 ++++++ 5 files changed, 78 insertions(+), 20 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 6e11297fd0..57ca21def1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -214,12 +214,20 @@ public class IngestionDelayTracker { // Do not update the ingestion delay metrics during server startup period return; } + if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) { + // If stream does not return a valid ingestion timestamps don't publish a metric + return; + } IngestionTimestamps previousMeasure = _partitionToIngestionTimestampsMap.put(partitionGroupId, new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs)); if (previousMeasure == null) { // First time we start tracking a partition we should start tracking it via metric - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, - ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionGroupId)); + // Only publish the metric if supported by the underlying stream. If not supported the stream + // returns Long.MIN_VALUE + if (ingestionTimeMs >= 0) { + _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS, + () -> getPartitionIngestionDelayMs(partitionGroupId)); + } if (firstStreamIngestionTimeMs >= 0) { // Only publish this metric when creation time is supported by the underlying stream // When this timestamp is not supported it always returns the value Long.MIN_VALUE diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 6236babece..4f3428a5fe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -527,9 +527,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { int indexedMessageCount = 0; int streamMessageCount = 0; boolean canTakeMore = true; + boolean hasTransformedRows = false; TransformPipeline.Result reusedResult = new TransformPipeline.Result(); boolean prematureExit = false; + RowMetadata msgMetadata = null; + for (int index = 0; index < messageCount; index++) { prematureExit = _shouldStop || endCriteriaReached(); if (prematureExit) { @@ -562,7 +565,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Decode message StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index)); - RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata(); + msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata(); if (decodedRow.getException() != null) { // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on // decode error @@ -591,7 +594,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED, reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter); } - for (GenericRow transformedRow : reusedResult.getTransformedRows()) { + List<GenericRow> transformedRows = reusedResult.getTransformedRows(); + if (transformedRows.size() > 0) { + hasTransformedRows = true; + } + for (GenericRow transformedRow : transformedRows) { try { canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); indexedMessageCount++; @@ -614,18 +621,31 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _numRowsConsumed++; streamMessageCount++; } - updateIngestionDelay(indexedMessageCount); + + if (indexedMessageCount > 0) { + // Record Ingestion delay for this partition with metadata for last message we processed + updateIngestionDelay(_lastRowMetadata); + } else if (!hasTransformedRows && (msgMetadata != null)) { + // If all messages were filtered by transformation, we still attempt to update ingestion delay using + // the metadata for the last message we processed if any. + updateIngestionDelay(msgMetadata); + } + updateCurrentDocumentCountMetrics(); if (messagesAndOffsets.getUnfilteredMessageCount() > 0) { _hasMessagesFetched = true; + if (messageCount == 0) { + // If we received events from the stream but all were filtered, we attempt to estimate the ingestion + // delay from the metadata of the last filtered message received. + updateIngestionDelay(messagesAndOffsets.getLastMessageMetadata()); + } if (streamMessageCount > 0 && _segmentLogger.isDebugEnabled()) { _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount, streamMessageCount, _currentOffset); } } else if (!prematureExit) { // Record Pinot ingestion delay as zero since we are up-to-date and no new events - long currentTimeMs = System.currentTimeMillis(); - _realtimeTableDataManager.updateIngestionDelay(currentTimeMs, currentTimeMs, _partitionGroupId); + setIngestionDelayToZero(); if (_segmentLogger.isDebugEnabled()) { _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis); } @@ -1566,18 +1586,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); } + private void updateIngestionDelay(RowMetadata metadata) { + if (metadata != null) { + _realtimeTableDataManager.updateIngestionDelay(metadata.getRecordIngestionTimeMs(), + metadata.getFirstStreamRecordIngestionTimeMs(), _partitionGroupId); + } + } + /* - * Updates the ingestion delay if messages were processed using the time stamp for the last consumed event. - * - * @param indexedMessagesCount + * Sets ingestion delay to zero in situations where we are caught up processing events. */ - private void updateIngestionDelay(int indexedMessageCount) { - if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) { - // Record Ingestion delay for this partition - _realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(), - _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(), - _partitionGroupId); - } + private void setIngestionDelayToZero() { + long currentTimeMs = System.currentTimeMillis(); + _realtimeTableDataManager.updateIngestionDelay(currentTimeMs, currentTimeMs, _partitionGroupId); } // This should be done during commit? We may not always commit when we build a segment.... diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index 1852c8bdc8..dbc3e8d2a6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -24,6 +24,7 @@ import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageMetadata; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -31,16 +32,29 @@ public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> { private final List<StreamMessage<byte[]>> _messageList; private final int _unfilteredMessageCount; private final long _lastOffset; + private final StreamMessageMetadata _lastMessageMetadata; /** * @param unfilteredMessageCount how many messages were received from the topic before being filtered * @param lastOffset the offset of the last message in the batch * @param batch the messages, which may be smaller than {@see unfilteredMessageCount} + * @param lastMessageMetadata metadata for last filtered message in the batch, useful for estimating ingestion delay + * when a batch has all messages filtered. */ - public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage<byte[]>> batch) { + public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage<byte[]>> batch, + StreamMessageMetadata lastMessageMetadata) { _messageList = batch; _lastOffset = lastOffset; _unfilteredMessageCount = unfilteredMessageCount; + _lastMessageMetadata = lastMessageMetadata; + } + + @Override + /** + * Returns the metadata for the last filtered message if any, null otherwise. + */ + public StreamMessageMetadata getLastMessageMetadata() { + return _lastMessageMetadata; } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 46504bbc0d..ff90f4b1a3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -70,15 +70,16 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size()); long lastOffset = startOffset; + StreamMessageMetadata rowMetadata = null; for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) { long offset = messageAndOffset.offset(); _lastFetchedOffset = offset; if (offset >= startOffset && (endOffset > offset || endOffset < 0)) { Bytes message = messageAndOffset.value(); + rowMetadata = (StreamMessageMetadata) _kafkaMetadataExtractor.extract(messageAndOffset); if (message != null) { String key = messageAndOffset.key(); byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) : null; - StreamMessageMetadata rowMetadata = (StreamMessageMetadata) _kafkaMetadataExtractor.extract(messageAndOffset); filtered.add(new KafkaStreamMessage(keyBytes, message.get(), rowMetadata)); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug("Tombstone message at offset: {}", offset); @@ -89,6 +90,6 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa endOffset); } } - return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered); + return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered, rowMetadata); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index d22a46af67..7ae4226e47 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.stream; +import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; @@ -116,4 +117,17 @@ public interface MessageBatch<T> { default boolean isEndOfPartitionGroup() { return false; } + + /** + * This is useful while determining ingestion delay for a message batch. Retaining metadata for last filtered message + * in a batch can enable us to estimate the ingestion delay for the batch. + * Note that a batch can be fully filtered, and we can still retain the metadata for the last filtered message to + * facilitate computing ingestion delay in the face of a fully filtered batch. + * + * @return null by default. + */ + @Nullable + default public StreamMessageMetadata getLastMessageMetadata() { + return null; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org