This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 55806ba484 Do not log exceptions when fetching offsets for lag metric (#13528) 55806ba484 is described below commit 55806ba484f75d3629b11c043eebe25aed222630 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed Jul 3 12:42:28 2024 +0530 Do not log exceptions when fetching offsets for lag metric (#13528) Co-authored-by: Kartik Khare <kharekar...@kartiks-macbook-pro.tail8a064.ts.net> --- .../realtime/RealtimeSegmentDataManager.java | 29 ++++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 862ec52615..c26b2c14f3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1682,25 +1682,38 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return _idleTimer.getTimeSinceEventLastConsumedMs(); } + public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs, boolean useDebugLog) { + return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs, useDebugLog); + } + public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) { - return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs); + return fetchLatestStreamOffset(maxWaitTimeMs, false); + } + + public StreamPartitionMsgOffset fetchEarliestStreamOffset(long maxWaitTimeMs, boolean useDebugLog) { + return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, maxWaitTimeMs, useDebugLog); } public StreamPartitionMsgOffset fetchEarliestStreamOffset(long maxWaitTimeMs) { - return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, maxWaitTimeMs); + return fetchEarliestStreamOffset(maxWaitTimeMs, false); } - private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long maxWaitTimeMs) { + private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long maxWaitTimeMs, + boolean useDebugLog) { if (_partitionMetadataProvider == null) { createPartitionMetadataProvider("Fetch latest stream offset"); } try { return _partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, maxWaitTimeMs); } catch (Exception e) { - _segmentLogger.warn( - String.format( - "Cannot fetch stream offset with criteria %s for clientId %s and partitionGroupId %d with maxWaitTime %d", - offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs), e); + String logMessage = String.format( + "Cannot fetch stream offset with criteria %s for clientId %s and partitionGroupId %d with maxWaitTime %d", + offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs); + if (!useDebugLog) { + _segmentLogger.warn(logMessage, e); + } else { + _segmentLogger.debug(logMessage, e); + } } return null; } @@ -1810,7 +1823,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private void updateIngestionMetrics(RowMetadata metadata) { if (metadata != null) { try { - StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000); + StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, true); _realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(), latestOffset, _partitionGroupId); } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org