sajjad-moradi commented on code in PR #13668:
URL: https://github.com/apache/pinot/pull/13668#discussion_r1696290097


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -214,7 +230,7 @@ private long getPartitionOffsetLag(IngestionOffsets offset) 
{
    *
    * @param partitionGroupId partition ID which we should stop tracking.
    */
-  private void removePartitionId(int partitionGroupId) {
+  private synchronized void removePartitionId(int partitionGroupId) {

Review Comment:
   why adding synchronized? All properties of this class are thread safe. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1029,6 +1039,47 @@ void 
updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>>
     }
   }
 
+  /**
+   * Handles segment movement between instances.
+   * If the new consuming segment is served by a different set of servers than 
the committed segment, notify the
+   * servers no longer serving the stream partition to remove the ingestion 
metrics. This can prevent servers from
+   * emitting high ingestion delay alerts on stream partitions no longer 
served.
+   */
+  private void handleSegmentMovement(String realtimeTableName, Map<String, 
Map<String, String>> instanceStatesMap,
+      String committedSegment, String newConsumingSegment) {
+    Set<String> oldInstances = 
instanceStatesMap.get(committedSegment).keySet();
+    Set<String> newInstances = 
instanceStatesMap.get(newConsumingSegment).keySet();
+    if (newInstances.containsAll(oldInstances)) {
+      return;
+    }
+    Set<String> instancesNoLongerServe = new HashSet<>(oldInstances);
+    instancesNoLongerServe.removeAll(newInstances);
+    LOGGER.info("Segment movement detected for committed segment: {} (served 
by: {}), "
+            + "consuming segment: {} (served by: {}) in table: {}, "
+            + "sending message to instances: {} to remove ingestion metrics", 
committedSegment, oldInstances,
+        newConsumingSegment, newInstances, realtimeTableName, 
instancesNoLongerServe);
+
+    ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
+    List<String> instancesSent = new 
ArrayList<>(instancesNoLongerServe.size());
+    for (String instance : instancesNoLongerServe) {
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setInstanceName(instance);
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setResource(realtimeTableName);
+      recipientCriteria.setPartition(committedSegment);
+      recipientCriteria.setSessionSpecific(true);
+      IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();

Review Comment:
   I prefer the current naming convention. Creating a custom message is not 
costly, and having a separate message for each specific task is more desirable. 
This approach adheres to the ‘single responsibility’ principle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to