This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ab0c27e947 Add decoder initialization error to the server's error cache (#10773) ab0c27e947 is described below commit ab0c27e947cbc280d3c55217a7a136259044de01 Author: Navina Ramesh <nav...@apache.org> AuthorDate: Wed May 17 15:26:51 2023 -0700 Add decoder initialization error to the server's error cache (#10773) * Add decoder initialization error to the server's error cache. * Adding constructor failure to error cache --- .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 4f3428a5fe..7f96359ce0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1419,8 +1419,15 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Create message decoder Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); - StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); - _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder); + try { + StreamMessageDecoder streamMessageDecoder = + StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); + _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder); + } catch (Exception e) { + _realtimeTableDataManager.addSegmentError(_segmentNameStr, + new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e)); + throw e; + } _transformPipeline = new TransformPipeline(tableConfig, schema); // Acquire semaphore to create stream consumers try { @@ -1458,6 +1465,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. // Hence releasing the semaphore here to unblock reset operation via Helix Admin. _partitionGroupConsumerSemaphore.release(); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), + "Failed to initialize segment data manager", e)); throw e; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org