sajjad-moradi commented on code in PR #13668: URL: https://github.com/apache/pinot/pull/13668#discussion_r1696290097
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -214,7 +230,7 @@ private long getPartitionOffsetLag(IngestionOffsets offset) { * * @param partitionGroupId partition ID which we should stop tracking. */ - private void removePartitionId(int partitionGroupId) { + private synchronized void removePartitionId(int partitionGroupId) { Review Comment: why adding synchronized? All properties of this class are thread safe. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1029,6 +1039,47 @@ void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> } } + /** + * Handles segment movement between instances. + * If the new consuming segment is served by a different set of servers than the committed segment, notify the + * servers no longer serving the stream partition to remove the ingestion metrics. This can prevent servers from + * emitting high ingestion delay alerts on stream partitions no longer served. + */ + private void handleSegmentMovement(String realtimeTableName, Map<String, Map<String, String>> instanceStatesMap, + String committedSegment, String newConsumingSegment) { + Set<String> oldInstances = instanceStatesMap.get(committedSegment).keySet(); + Set<String> newInstances = instanceStatesMap.get(newConsumingSegment).keySet(); + if (newInstances.containsAll(oldInstances)) { + return; + } + Set<String> instancesNoLongerServe = new HashSet<>(oldInstances); + instancesNoLongerServe.removeAll(newInstances); + LOGGER.info("Segment movement detected for committed segment: {} (served by: {}), " + + "consuming segment: {} (served by: {}) in table: {}, " + + "sending message to instances: {} to remove ingestion metrics", committedSegment, oldInstances, + newConsumingSegment, newInstances, realtimeTableName, instancesNoLongerServe); + + ClusterMessagingService messagingService = _helixManager.getMessagingService(); + List<String> instancesSent = new ArrayList<>(instancesNoLongerServe.size()); + for (String instance : instancesNoLongerServe) { + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setInstanceName(instance); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setResource(realtimeTableName); + recipientCriteria.setPartition(committedSegment); + recipientCriteria.setSessionSpecific(true); + IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage(); Review Comment: I prefer the current naming convention. Creating a custom message is not costly, and having a separate message for each specific task is more desirable. This approach adheres to the ‘single responsibility’ principle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org