This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ca8f96e59 Improve ingestion related logs (#15438)
3ca8f96e59 is described below

commit 3ca8f96e598b6bb26f3fac66132a0b64372f9d01
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Thu Apr 10 22:12:27 2025 +0530

    Improve ingestion related logs (#15438)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  15 ++-
 .../SegmentOnlineOfflineStateModelFactory.java     | 134 ++++++++++++++++-----
 2 files changed, 119 insertions(+), 30 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7e93f70e67..b1cc9c3205 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -622,7 +622,6 @@ public class PinotLLCRealtimeSegmentManager {
     preProcessNewSegmentZKMetadata();
 
     // Step-2: Create new segment metadata if needed
-    LOGGER.info("Creating new segment metadata with status IN_PROGRESS: {}", 
committingSegmentName);
     long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName =
         createNewSegmentMetadata(tableConfig, idealState, 
committingSegmentDescriptor, committingSegmentZKMetadata,
@@ -741,6 +740,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   // Step 2: Create new segment metadata
+  @Nullable
   private String createNewSegmentMetadata(TableConfig tableConfig, IdealState 
idealState,
       CommittingSegmentDescriptor committingSegmentDescriptor, 
SegmentZKMetadata committingSegmentZKMetadata,
       InstancePartitions instancePartitions) {
@@ -771,7 +771,11 @@ public class PinotLLCRealtimeSegmentManager {
             committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
             numReplicas);
         newConsumingSegmentName = newLLCSegment.getSegmentName();
+        LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
+            Status.IN_PROGRESS);
       }
+    } else {
+      LOGGER.info("Skipped creation of new segment metadata as the table: {} 
is paused", realtimeTableName);
     }
     return newConsumingSegmentName;
   }
@@ -1046,13 +1050,20 @@ public class PinotLLCRealtimeSegmentManager {
 
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
     String segmentName = llcSegmentName.getSegmentName();
-    LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", 
segmentName, instanceName);
+    LOGGER.info("Attempting to mark segment: {} OFFLINE on instance: {} if it 
is currently CONSUMING.", segmentName,
+        instanceName);
     try {
       HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
         assert idealState != null;
         Map<String, String> stateMap = 
idealState.getInstanceStateMap(segmentName);
+        if (stateMap == null) {
+          LOGGER.info("Skipping update for segment: {} state to state: {} in 
ideal state as instanceStateMap is null.",
+              segmentName, SegmentStateModel.OFFLINE);
+          return idealState;
+        }
         String state = stateMap.get(instanceName);
         if (SegmentStateModel.CONSUMING.equals(state)) {
+          LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", 
segmentName, instanceName);
           stateMap.put(instanceName, SegmentStateModel.OFFLINE);
         } else {
           LOGGER.info("Segment {} in state {} when trying to register 
consumption stop from {}", segmentName, state,
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 10185371fa..ea9926812a 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -78,35 +78,69 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
     public void onBecomeConsumingFromOffline(Message message, 
NotificationContext context)
         throws Exception {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : 
{}", message);
-      _instanceDataManager.addConsumingSegment(message.getResourceName(), 
message.getPartitionName());
+
+      try {
+        _instanceDataManager.addConsumingSegment(message.getResourceName(), 
message.getPartitionName());
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() for "
+                + "table: {}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     @Transition(from = "CONSUMING", to = "ONLINE")
     public void onBecomeOnlineFromConsuming(Message message, 
NotificationContext context)
         throws Exception {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : 
{}", message);
-      _instanceDataManager.addOnlineSegment(message.getResourceName(), 
message.getPartitionName());
+
+      try {
+        _instanceDataManager.addOnlineSegment(message.getResourceName(), 
message.getPartitionName());
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() for "
+                + "table: {}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     @Transition(from = "CONSUMING", to = "OFFLINE")
     public void onBecomeOfflineFromConsuming(Message message, 
NotificationContext context)
         throws Exception {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : 
{}", message);
-      String realtimeTableName = message.getResourceName();
-      String segmentName = message.getPartitionName();
-      _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
-      _recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName, 
segmentName), true);
+      try {
+        String realtimeTableName = message.getResourceName();
+        String segmentName = message.getPartitionName();
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName, 
segmentName), true);
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() for "
+                + "table: {}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     @Transition(from = "CONSUMING", to = "DROPPED")
     public void onBecomeDroppedFromConsuming(Message message, 
NotificationContext context)
         throws Exception {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : 
{}", message);
-      String realtimeTableName = message.getResourceName();
-      String segmentName = message.getPartitionName();
-      _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
-      _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
-      onConsumingToDropped(realtimeTableName, segmentName);
+      try {
+        String realtimeTableName = message.getResourceName();
+        String segmentName = message.getPartitionName();
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+        onConsumingToDropped(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() for "
+                + "table: {}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     /**
@@ -128,31 +162,57 @@ public class SegmentOnlineOfflineStateModelFactory 
extends StateModelFactory<Sta
     public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context)
         throws Exception {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() 
: {}", message);
-      _instanceDataManager.addOnlineSegment(message.getResourceName(), 
message.getPartitionName());
+
+      try {
+        _instanceDataManager.addOnlineSegment(message.getResourceName(), 
message.getPartitionName());
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() for table: "
+                + "{}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     @Transition(from = "ONLINE", to = "OFFLINE")
     public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context)
         throws Exception {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() 
: {}", message);
-      _instanceDataManager.offloadSegment(message.getResourceName(), 
message.getPartitionName());
+      try {
+        _instanceDataManager.offloadSegment(message.getResourceName(), 
message.getPartitionName());
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() for table: "
+                + "{}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     @Transition(from = "OFFLINE", to = "DROPPED")
     public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context)
         throws Exception {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : 
{}", message);
-      String tableNameWithType = message.getResourceName();
-      String segmentName = message.getPartitionName();
-      _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
-
-      // Check if the segment is recently offloaded from CONSUMING to OFFLINE
-      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-        Pair<String, String> tableSegmentPair = Pair.of(tableNameWithType, 
segmentName);
-        if (_recentlyOffloadedConsumingSegments.getIfPresent(tableSegmentPair) 
!= null) {
-          _recentlyOffloadedConsumingSegments.invalidate(tableSegmentPair);
-          onConsumingToDropped(tableNameWithType, segmentName);
+
+      try {
+        String tableNameWithType = message.getResourceName();
+        String segmentName = message.getPartitionName();
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+
+        // Check if the segment is recently offloaded from CONSUMING to OFFLINE
+        if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+          Pair<String, String> tableSegmentPair = Pair.of(tableNameWithType, 
segmentName);
+          if 
(_recentlyOffloadedConsumingSegments.getIfPresent(tableSegmentPair) != null) {
+            _recentlyOffloadedConsumingSegments.invalidate(tableSegmentPair);
+            onConsumingToDropped(tableNameWithType, segmentName);
+          }
         }
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() for table: "
+                + "{}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
       }
     }
 
@@ -160,10 +220,19 @@ public class SegmentOnlineOfflineStateModelFactory 
extends StateModelFactory<Sta
     public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context)
         throws Exception {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() 
: {}", message);
-      String tableNameWithType = message.getResourceName();
-      String segmentName = message.getPartitionName();
-      _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
-      _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+
+      try {
+        String tableNameWithType = message.getResourceName();
+        String segmentName = message.getPartitionName();
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() for table: "
+                + "{}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
 
     @Transition(from = "ERROR", to = "OFFLINE")
@@ -175,7 +244,16 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
     public void onBecomeDroppedFromError(Message message, NotificationContext 
context)
         throws Exception {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromError() 
: {}", message);
-      _instanceDataManager.deleteSegment(message.getResourceName(), 
message.getPartitionName());
+
+      try {
+        _instanceDataManager.deleteSegment(message.getResourceName(), 
message.getPartitionName());
+      } catch (Exception e) {
+        _logger.error(
+            "Caught exception while processing 
SegmentOnlineOfflineStateModel.onBecomeDroppedFromError() for table: "
+                + "{}, segment: {}",
+            message.getResourceName(), message.getPartitionName(), e);
+        throw e;
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to