richardstartin commented on a change in pull request #7927: URL: https://github.com/apache/pinot/pull/7927#discussion_r771821781
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -423,12 +427,17 @@ protected boolean consumeLoop() consecutiveIdleCount = 0; // We consumed something. Update the highest stream offset as well as partition-consuming metric. // TODO Issue 5359 Need to find a way to bump metrics without getting actual offset value. -// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, -// _currentOffset.getOffset()); -// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, -// _currentOffset.getOffset()); + //_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, + //_currentOffset.getOffset()); + //_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, + //_currentOffset.getOffset()); _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1); lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); + } else if (messageBatch.getUnfilteredMessageCount() > 0) { + // we consumed something from the stream but filtered all the content out, + // so we need to advance the offsets to avoid getting stuck + _currentOffset = messageBatch.getLastOffset(); + lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); Review comment: This is the bug fix, this ensures that we advance after consuming a bad batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org