This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ab0c27e947 Add decoder initialization error to the server's error 
cache (#10773)
ab0c27e947 is described below

commit ab0c27e947cbc280d3c55217a7a136259044de01
Author: Navina Ramesh <nav...@apache.org>
AuthorDate: Wed May 17 15:26:51 2023 -0700

    Add decoder initialization error to the server's error cache (#10773)
    
    * Add decoder initialization error to the server's error cache.
    * Adding constructor failure to error cache
---
 .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4f3428a5fe..7f96359ce0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1419,8 +1419,15 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
     // Create message decoder
     Set<String> fieldsToRead = 
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), 
_schema);
-    StreamMessageDecoder streamMessageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
-    _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+    try {
+      StreamMessageDecoder streamMessageDecoder =
+          StreamDecoderProvider.create(_partitionLevelStreamConfig, 
fieldsToRead);
+      _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+    } catch (Exception e) {
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+          new SegmentErrorInfo(now(), "Failed to initialize the 
StreamMessageDecoder", e));
+      throw e;
+    }
     _transformPipeline = new TransformPipeline(tableConfig, schema);
     // Acquire semaphore to create stream consumers
     try {
@@ -1458,6 +1465,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
       // Hence releasing the semaphore here to unblock reset operation via 
Helix Admin.
       _partitionGroupConsumerSemaphore.release();
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(),
+          "Failed to initialize segment data manager", e));
       throw e;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to