KKcorps commented on code in PR #13668:
URL: https://github.com/apache/pinot/pull/13668#discussion_r1690763064


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1029,6 +1039,47 @@ void 
updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>>
     }
   }
 
+  /**
+   * Handles segment movement between instances.
+   * If the new consuming segment is served by a different set of servers than 
the committed segment, notify the
+   * servers no longer serving the stream partition to remove the ingestion 
metrics. This can prevent servers from
+   * emitting high ingestion delay alerts on stream partitions no longer 
served.
+   */
+  private void handleSegmentMovement(String realtimeTableName, Map<String, 
Map<String, String>> instanceStatesMap,
+      String committedSegment, String newConsumingSegment) {
+    Set<String> oldInstances = 
instanceStatesMap.get(committedSegment).keySet();
+    Set<String> newInstances = 
instanceStatesMap.get(newConsumingSegment).keySet();
+    if (newInstances.containsAll(oldInstances)) {
+      return;
+    }
+    Set<String> instancesNoLongerServe = new HashSet<>(oldInstances);
+    instancesNoLongerServe.removeAll(newInstances);
+    LOGGER.info("Segment movement detected for committed segment: {} (served 
by: {}), "
+            + "consuming segment: {} (served by: {}) in table: {}, "
+            + "sending message to instances: {} to remove ingestion metrics", 
committedSegment, oldInstances,
+        newConsumingSegment, newInstances, realtimeTableName, 
instancesNoLongerServe);
+
+    ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
+    List<String> instancesSent = new 
ArrayList<>(instancesNoLongerServe.size());
+    for (String instance : instancesNoLongerServe) {
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setInstanceName(instance);
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setResource(realtimeTableName);
+      recipientCriteria.setPartition(committedSegment);
+      recipientCriteria.setSessionSpecific(true);
+      IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();

Review Comment:
   I feel that we should rename `IngestionMetricsRemoveMessage` to something 
else so that it can be reused for other purposes as well in future.  
   
   e.g. `RemoveInstanceMessage` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to