KKcorps commented on code in PR #13668: URL: https://github.com/apache/pinot/pull/13668#discussion_r1690763064
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1029,6 +1039,47 @@ void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> } } + /** + * Handles segment movement between instances. + * If the new consuming segment is served by a different set of servers than the committed segment, notify the + * servers no longer serving the stream partition to remove the ingestion metrics. This can prevent servers from + * emitting high ingestion delay alerts on stream partitions no longer served. + */ + private void handleSegmentMovement(String realtimeTableName, Map<String, Map<String, String>> instanceStatesMap, + String committedSegment, String newConsumingSegment) { + Set<String> oldInstances = instanceStatesMap.get(committedSegment).keySet(); + Set<String> newInstances = instanceStatesMap.get(newConsumingSegment).keySet(); + if (newInstances.containsAll(oldInstances)) { + return; + } + Set<String> instancesNoLongerServe = new HashSet<>(oldInstances); + instancesNoLongerServe.removeAll(newInstances); + LOGGER.info("Segment movement detected for committed segment: {} (served by: {}), " + + "consuming segment: {} (served by: {}) in table: {}, " + + "sending message to instances: {} to remove ingestion metrics", committedSegment, oldInstances, + newConsumingSegment, newInstances, realtimeTableName, instancesNoLongerServe); + + ClusterMessagingService messagingService = _helixManager.getMessagingService(); + List<String> instancesSent = new ArrayList<>(instancesNoLongerServe.size()); + for (String instance : instancesNoLongerServe) { + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setInstanceName(instance); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setResource(realtimeTableName); + recipientCriteria.setPartition(committedSegment); + recipientCriteria.setSessionSpecific(true); + IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage(); Review Comment: I feel that we should rename `IngestionMetricsRemoveMessage` to something else so that it can be reused for other purposes as well in future. e.g. `RemoveInstanceMessage` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org