This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f54559987e Handle unsupported exception gracefully (#13524)
f54559987e is described below

commit f54559987e9b66eaed4cc68852653e14a133b9e0
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Tue Jul 2 18:34:36 2024 +0530

    Handle unsupported exception gracefully (#13524)
    
    Co-authored-by: Kartik Khare 
<kharekar...@kartiks-macbook-pro.tail8a064.ts.net>
---
 .../pinot/core/data/manager/realtime/IngestionDelayTracker.java       | 4 ++++
 .../pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java  | 3 +--
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index eed1302708..6953ddaf33 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -195,6 +195,10 @@ public class IngestionDelayTracker {
     StreamPartitionMsgOffset currentOffset = offset._offset;
     StreamPartitionMsgOffset latestOffset = offset._latestOffset;
 
+    if (currentOffset == null || latestOffset == null) {
+      return 0;
+    }
+
     // Compute aged delay for current partition
     // TODO: Support other types of offsets
     if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index dbfe885cc0..862ec52615 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1810,8 +1810,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private void updateIngestionMetrics(RowMetadata metadata) {
     if (metadata != null) {
       try {
-        StreamPartitionMsgOffset latestOffset =
-            
_partitionMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
 5000);
+        StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000);
         
_realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(),
             metadata.getFirstStreamRecordIngestionTimeMs(), 
metadata.getOffset(), latestOffset, _partitionGroupId);
       } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to