This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3ca8f96e59 Improve ingestion related logs (#15438) 3ca8f96e59 is described below commit 3ca8f96e598b6bb26f3fac66132a0b64372f9d01 Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Thu Apr 10 22:12:27 2025 +0530 Improve ingestion related logs (#15438) --- .../realtime/PinotLLCRealtimeSegmentManager.java | 15 ++- .../SegmentOnlineOfflineStateModelFactory.java | 134 ++++++++++++++++----- 2 files changed, 119 insertions(+), 30 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 7e93f70e67..b1cc9c3205 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -622,7 +622,6 @@ public class PinotLLCRealtimeSegmentManager { preProcessNewSegmentZKMetadata(); // Step-2: Create new segment metadata if needed - LOGGER.info("Creating new segment metadata with status IN_PROGRESS: {}", committingSegmentName); long startTimeNs2 = System.nanoTime(); String newConsumingSegmentName = createNewSegmentMetadata(tableConfig, idealState, committingSegmentDescriptor, committingSegmentZKMetadata, @@ -741,6 +740,7 @@ public class PinotLLCRealtimeSegmentManager { } // Step 2: Create new segment metadata + @Nullable private String createNewSegmentMetadata(TableConfig tableConfig, IdealState idealState, CommittingSegmentDescriptor committingSegmentDescriptor, SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions) { @@ -771,7 +771,11 @@ public class PinotLLCRealtimeSegmentManager { committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas); newConsumingSegmentName = newLLCSegment.getSegmentName(); + LOGGER.info("Created new segment metadata for segment: {} with status: {}.", newConsumingSegmentName, + Status.IN_PROGRESS); } + } else { + LOGGER.info("Skipped creation of new segment metadata as the table: {} is paused", realtimeTableName); } return newConsumingSegmentName; } @@ -1046,13 +1050,20 @@ public class PinotLLCRealtimeSegmentManager { String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName()); String segmentName = llcSegmentName.getSegmentName(); - LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, instanceName); + LOGGER.info("Attempting to mark segment: {} OFFLINE on instance: {} if it is currently CONSUMING.", segmentName, + instanceName); try { HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName); + if (stateMap == null) { + LOGGER.info("Skipping update for segment: {} state to state: {} in ideal state as instanceStateMap is null.", + segmentName, SegmentStateModel.OFFLINE); + return idealState; + } String state = stateMap.get(instanceName); if (SegmentStateModel.CONSUMING.equals(state)) { + LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, instanceName); stateMap.put(instanceName, SegmentStateModel.OFFLINE); } else { LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 10185371fa..ea9926812a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -78,35 +78,69 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta public void onBecomeConsumingFromOffline(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : {}", message); - _instanceDataManager.addConsumingSegment(message.getResourceName(), message.getPartitionName()); + + try { + _instanceDataManager.addConsumingSegment(message.getResourceName(), message.getPartitionName()); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() for " + + "table: {}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } @Transition(from = "CONSUMING", to = "ONLINE") public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : {}", message); - _instanceDataManager.addOnlineSegment(message.getResourceName(), message.getPartitionName()); + + try { + _instanceDataManager.addOnlineSegment(message.getResourceName(), message.getPartitionName()); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() for " + + "table: {}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } @Transition(from = "CONSUMING", to = "OFFLINE") public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : {}", message); - String realtimeTableName = message.getResourceName(); - String segmentName = message.getPartitionName(); - _instanceDataManager.offloadSegment(realtimeTableName, segmentName); - _recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName, segmentName), true); + try { + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + _recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName, segmentName), true); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() for " + + "table: {}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } @Transition(from = "CONSUMING", to = "DROPPED") public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : {}", message); - String realtimeTableName = message.getResourceName(); - String segmentName = message.getPartitionName(); - _instanceDataManager.offloadSegment(realtimeTableName, segmentName); - _instanceDataManager.deleteSegment(realtimeTableName, segmentName); - onConsumingToDropped(realtimeTableName, segmentName); + try { + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + _instanceDataManager.deleteSegment(realtimeTableName, segmentName); + onConsumingToDropped(realtimeTableName, segmentName); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() for " + + "table: {}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } /** @@ -128,31 +162,57 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta public void onBecomeOnlineFromOffline(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message); - _instanceDataManager.addOnlineSegment(message.getResourceName(), message.getPartitionName()); + + try { + _instanceDataManager.addOnlineSegment(message.getResourceName(), message.getPartitionName()); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() for table: " + + "{}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } @Transition(from = "ONLINE", to = "OFFLINE") public void onBecomeOfflineFromOnline(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : {}", message); - _instanceDataManager.offloadSegment(message.getResourceName(), message.getPartitionName()); + try { + _instanceDataManager.offloadSegment(message.getResourceName(), message.getPartitionName()); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() for table: " + + "{}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } @Transition(from = "OFFLINE", to = "DROPPED") public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : {}", message); - String tableNameWithType = message.getResourceName(); - String segmentName = message.getPartitionName(); - _instanceDataManager.deleteSegment(tableNameWithType, segmentName); - - // Check if the segment is recently offloaded from CONSUMING to OFFLINE - if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { - Pair<String, String> tableSegmentPair = Pair.of(tableNameWithType, segmentName); - if (_recentlyOffloadedConsumingSegments.getIfPresent(tableSegmentPair) != null) { - _recentlyOffloadedConsumingSegments.invalidate(tableSegmentPair); - onConsumingToDropped(tableNameWithType, segmentName); + + try { + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + + // Check if the segment is recently offloaded from CONSUMING to OFFLINE + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + Pair<String, String> tableSegmentPair = Pair.of(tableNameWithType, segmentName); + if (_recentlyOffloadedConsumingSegments.getIfPresent(tableSegmentPair) != null) { + _recentlyOffloadedConsumingSegments.invalidate(tableSegmentPair); + onConsumingToDropped(tableNameWithType, segmentName); + } } + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() for table: " + + "{}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; } } @@ -160,10 +220,19 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta public void onBecomeDroppedFromOnline(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : {}", message); - String tableNameWithType = message.getResourceName(); - String segmentName = message.getPartitionName(); - _instanceDataManager.offloadSegment(tableNameWithType, segmentName); - _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + + try { + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() for table: " + + "{}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } @Transition(from = "ERROR", to = "OFFLINE") @@ -175,7 +244,16 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta public void onBecomeDroppedFromError(Message message, NotificationContext context) throws Exception { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : {}", message); - _instanceDataManager.deleteSegment(message.getResourceName(), message.getPartitionName()); + + try { + _instanceDataManager.deleteSegment(message.getResourceName(), message.getPartitionName()); + } catch (Exception e) { + _logger.error( + "Caught exception while processing SegmentOnlineOfflineStateModel.onBecomeDroppedFromError() for table: " + + "{}, segment: {}", + message.getResourceName(), message.getPartitionName(), e); + throw e; + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org