This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 652cc98  realtime debug logging (#7946)
652cc98 is described below

commit 652cc9824d260a46226d4733b5972a6b1e7d65e1
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Wed Dec 22 18:02:39 2021 +0000

    realtime debug logging (#7946)
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 25 ++++++++++++++++++----
 .../kafka20/KafkaPartitionLevelConsumer.java       | 12 +++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index f463e0b..b56a794 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -397,6 +397,11 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       try {
         messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, 
_partitionLevelStreamConfig.getFetchTimeoutMillis());
+        if (_segmentLogger.isDebugEnabled()) {
+          _segmentLogger.debug("message batch received. filter={} 
unfiltered={} endOfPartitionGroup={}",
+              messageBatch.getUnfilteredMessageCount(), 
messageBatch.getMessageCount(),
+              messageBatch.isEndOfPartitionGroup());
+        }
         _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         _consecutiveErrorCount = 0;
       } catch (PermanentConsumerException e) {
@@ -426,8 +431,12 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       } else if (messageBatch.getUnfilteredMessageCount() > 0) {
         // we consumed something from the stream but filtered all the content 
out,
         // so we need to advance the offsets to avoid getting stuck
-        _currentOffset = messageBatch.getOffsetOfNextBatch();
-        lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);
+        StreamPartitionMsgOffset nextOffset = 
messageBatch.getOffsetOfNextBatch();
+        if (_segmentLogger.isDebugEnabled()) {
+          _segmentLogger.debug("Skipped empty batch. Advancing from {} to {}", 
_currentOffset, nextOffset);
+        }
+        _currentOffset = nextOffset;
+        lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(nextOffset);
       } else {
         // We did not consume any rows. Update the partition-consuming metric 
only if we have been idling for a long
         // time.
@@ -459,6 +468,9 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     GenericRow reuse = new GenericRow();
     for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) 
{
       if (_shouldStop || endCriteriaReached()) {
+        if (_segmentLogger.isDebugEnabled()) {
+          _segmentLogger.debug("stop processing message batch early 
shouldStop: {}", _shouldStop);
+        }
         break;
       }
       if (!canTakeMore) {
@@ -556,9 +568,14 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     }
     updateCurrentDocumentCountMetrics();
     if (streamMessageCount != 0) {
-      _segmentLogger.debug("Indexed {} messages ({} messages read from stream) 
current offset {}", indexedMessageCount,
-          streamMessageCount, _currentOffset);
+      if (_segmentLogger.isDebugEnabled()) {
+        _segmentLogger.debug("Indexed {} messages ({} messages read from 
stream) current offset {}",
+            indexedMessageCount, streamMessageCount, _currentOffset);
+      }
     } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
+      if (_segmentLogger.isDebugEnabled()) {
+        _segmentLogger.debug("empty batch received - sleeping for {}ms", 
idlePipeSleepTimeMillis);
+      }
       // If there were no messages to be fetched from stream, wait for a 
little bit as to avoid hammering the stream
       Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, 
TimeUnit.MILLISECONDS);
     }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 68bbc9e..a1e0cc8 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -30,11 +30,15 @@ import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
+
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
@@ -48,6 +52,10 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
   }
 
   public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, 
int timeoutMillis) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: 
{}ms", _topicPartition, startOffset,
+          endOffset, timeoutMillis);
+    }
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
@@ -59,8 +67,12 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
       if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
         if (message != null) {
           filtered.add(new MessageAndOffset(message.get(), offset));
+        } else if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("tombstone message at offset {}", offset);
         }
         lastOffset = offset;
+      } else if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("filter message at offset {} (outside of offset range {} 
{})", offset, startOffset, endOffset);
       }
     }
     return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, 
filtered);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to