This is an automated email from the ASF dual-hosted git repository. mayanks pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 652cc98 realtime debug logging (#7946) 652cc98 is described below commit 652cc9824d260a46226d4733b5972a6b1e7d65e1 Author: Richard Startin <rich...@startree.ai> AuthorDate: Wed Dec 22 18:02:39 2021 +0000 realtime debug logging (#7946) --- .../realtime/LLRealtimeSegmentDataManager.java | 25 ++++++++++++++++++---- .../kafka20/KafkaPartitionLevelConsumer.java | 12 +++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index f463e0b..b56a794 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -397,6 +397,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { try { messageBatch = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("message batch received. filter={} unfiltered={} endOfPartitionGroup={}", + messageBatch.getUnfilteredMessageCount(), messageBatch.getMessageCount(), + messageBatch.isEndOfPartitionGroup()); + } _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); _consecutiveErrorCount = 0; } catch (PermanentConsumerException e) { @@ -426,8 +431,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } else if (messageBatch.getUnfilteredMessageCount() > 0) { // we consumed something from the stream but filtered all the content out, // so we need to advance the offsets to avoid getting stuck - _currentOffset = messageBatch.getOffsetOfNextBatch(); - lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); + StreamPartitionMsgOffset nextOffset = messageBatch.getOffsetOfNextBatch(); + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("Skipped empty batch. Advancing from {} to {}", _currentOffset, nextOffset); + } + _currentOffset = nextOffset; + lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(nextOffset); } else { // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long // time. @@ -459,6 +468,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { GenericRow reuse = new GenericRow(); for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) { if (_shouldStop || endCriteriaReached()) { + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop); + } break; } if (!canTakeMore) { @@ -556,9 +568,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } updateCurrentDocumentCountMetrics(); if (streamMessageCount != 0) { - _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount, - streamMessageCount, _currentOffset); + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", + indexedMessageCount, streamMessageCount, _currentOffset); + } } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) { + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis); + } // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 68bbc9e..a1e0cc8 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -30,11 +30,15 @@ import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler implements PartitionLevelConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class); + public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition); } @@ -48,6 +52,10 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa } public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, + endOffset, timeoutMillis); + } _consumer.seek(_topicPartition, startOffset); ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); @@ -59,8 +67,12 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { if (message != null) { filtered.add(new MessageAndOffset(message.get(), offset)); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug("tombstone message at offset {}", offset); } lastOffset = offset; + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug("filter message at offset {} (outside of offset range {} {})", offset, startOffset, endOffset); } } return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org