richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821781



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -423,12 +427,17 @@ protected boolean consumeLoop()
         consecutiveIdleCount = 0;
         // We consumed something. Update the highest stream offset as well as 
partition-consuming metric.
         // TODO Issue 5359 Need to find a way to bump metrics without getting 
actual offset value.
-//        _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
-//        _currentOffset.getOffset());
-//        _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
-//        _currentOffset.getOffset());
+        //_serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
+        //_currentOffset.getOffset());
+        //_serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+        //_currentOffset.getOffset());
         _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);
+      } else if (messageBatch.getUnfilteredMessageCount() > 0) {
+        // we consumed something from the stream but filtered all the content 
out,
+        // so we need to advance the offsets to avoid getting stuck
+        _currentOffset = messageBatch.getLastOffset();
+        lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);

Review comment:
       This is the bug fix, this ensures that we advance after consuming a bad 
batch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to