This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a585c9ed90 Fix Helix message exception when EV doesn't exist (#16133)
7a585c9ed90 is described below

commit 7a585c9ed908b6e6c32e89db8b0049e3daf52667
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Jun 17 17:20:46 2025 -0600

    Fix Helix message exception when EV doesn't exist (#16133)
---
 .../helix/core/PinotHelixResourceManager.java      | 226 ++++++---------------
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  29 +--
 .../helix/core/relocation/SegmentRelocator.java    |  19 +-
 .../helix/core/util/MessagingServiceUtils.java     |  74 +++++++
 .../core/relocation/SegmentRelocatorTest.java      |   4 +-
 5 files changed, 147 insertions(+), 205 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1d01e23525b..8b474d2e987 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -155,6 +155,7 @@ import 
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.controller.workload.QueryWorkloadManager;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -199,10 +200,8 @@ public class PinotHelixResourceManager {
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5;
   public static final String APPEND = "APPEND";
-  private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
   private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
   private static final String API_REQUEST_ID_PREFIX = "api-";
-  private static final int INFINITE_TIMEOUT = -1;
 
   private enum LineageUpdateType {
     START, END, REVERT
@@ -211,8 +210,6 @@ public class PinotHelixResourceManager {
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 
60_000L; // 10 minutes
   public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 
second
-  public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 
minutes
-  public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 
second
 
   private static final DateTimeFormatter SIMPLE_DATE_FORMAT =
       
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
@@ -2759,24 +2756,10 @@ public class PinotHelixResourceManager {
    * Delete the table on servers by sending table deletion messages.
    */
   private void deleteTableOnServers(String tableNameWithType) {
-    // External view can be null for newly created table, skip sending messages
-    if 
(_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) == 
null) {
-      LOGGER.warn("No delete table message sent for newly created table: {} 
without external view", tableNameWithType);
-      return;
-    }
-
     LOGGER.info("Sending delete table messages for table: {}", 
tableNameWithType);
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-    TableDeletionMessage tableDeletionMessage = new 
TableDeletionMessage(tableNameWithType);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
tableDeletionMessage, null, timeoutMs);
+    TableDeletionMessage message = new TableDeletionMessage(tableNameWithType);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} delete table messages for table: {}", 
numMessagesSent, tableNameWithType);
     } else {
@@ -2821,27 +2804,21 @@ public class PinotHelixResourceManager {
       Preconditions.checkArgument(tt == TableType.OFFLINE,
           "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
     }
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
+
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
     for (Map.Entry<String, List<String>> entry : 
instanceToSegmentsMap.entrySet()) {
       String targetInstance = entry.getKey();
-      List<String> segments = entry.getValue();
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(targetInstance);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
-      SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
-      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+      SegmentReloadMessage message = new 
SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, message, 
tableNameWithType, null, targetInstance);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent {} reload messages to instance: {} for table: {}", 
numMessagesSent, targetInstance,
             tableNameWithType);
       } else {
         LOGGER.warn("No reload message sent to instance: {} for table: {}", 
targetInstance, tableNameWithType);
       }
-      instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, 
segmentReloadMessage.getMsgId()));
+      instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, 
message.getMsgId()));
     }
     return instanceMsgInfoMap;
   }
@@ -2858,24 +2835,17 @@ public class PinotHelixResourceManager {
           "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
     }
 
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-    SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, forceDownload);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+    SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, 
forceDownload);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, message, 
tableNameWithType, null, targetInstance);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, 
tableNameWithType);
     } else {
       LOGGER.warn("No reload message sent for table: {}", tableNameWithType);
     }
 
-    return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+    return Pair.of(numMessagesSent, message.getMsgId());
   }
 
   public Pair<Integer, String> reloadSegment(String tableNameWithType, String 
segmentName, boolean forceDownload,
@@ -2891,26 +2861,18 @@ public class PinotHelixResourceManager {
           segmentName);
     }
 
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setPartition(segmentName);
-    recipientCriteria.setSessionSpecific(true);
-    SegmentReloadMessage segmentReloadMessage =
-        new SegmentReloadMessage(tableNameWithType, 
Collections.singletonList(segmentName), forceDownload);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+    SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, 
List.of(segmentName), forceDownload);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, message, 
tableNameWithType, segmentName, targetInstance);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} reload messages for segment: {} in table: {}", 
numMessagesSent, segmentName,
           tableNameWithType);
     } else {
       LOGGER.warn("No reload message sent for segment: {} in table: {}", 
segmentName, tableNameWithType);
     }
-    return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+
+    return Pair.of(numMessagesSent, message.getMsgId());
   }
 
   /**
@@ -3023,8 +2985,8 @@ public class PinotHelixResourceManager {
    */
   @VisibleForTesting
   void resetPartitionAllState(String instanceName, String resourceName, 
Set<String> resetPartitionNames) {
-    LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster 
{}.",
-        resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName, instanceName, _helixClusterName);
+    LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}", 
resetPartitionNames, resourceName,
+        instanceName);
     HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -3060,7 +3022,7 @@ public class PinotHelixResourceManager {
           + message.getResourceName());
     }
 
-    String adminName = null;
+    String adminName;
     try {
       adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
     } catch (UnknownHostException e) {
@@ -3112,21 +3074,12 @@ public class PinotHelixResourceManager {
    */
   public void sendSegmentRefreshMessage(String tableNameWithType, String 
segmentName, boolean refreshServerSegment,
       boolean refreshBrokerRouting) {
-    SegmentRefreshMessage segmentRefreshMessage = new 
SegmentRefreshMessage(tableNameWithType, segmentName);
-
-    // Send segment refresh message to servers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    SegmentRefreshMessage message = new 
SegmentRefreshMessage(tableNameWithType, segmentName);
 
+    // Send segment refresh message to servers
     if (refreshServerSegment) {
-      // Send segment refresh message to servers
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setPartition(segmentName);
-      // Send message with no callback and infinite timeout on the recipient
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentRefreshMessage, null, -1);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType, segmentName, null);
       if (numMessagesSent > 0) {
         // TODO: Would be nice if we can get the name of the instances to 
which messages were sent
         LOGGER.info("Sent {} segment refresh messages to servers for segment: 
{} of table: {}", numMessagesSent,
@@ -3137,11 +3090,11 @@ public class PinotHelixResourceManager {
       }
     }
 
+    // Send segment refresh message to brokers
     if (refreshBrokerRouting) {
-      // Send segment refresh message to brokers
-      recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-      recipientCriteria.setPartition(tableNameWithType);
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentRefreshMessage, null, -1);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType,
+              null);
       if (numMessagesSent > 0) {
         // TODO: Would be nice if we can get the name of the instances to 
which messages were sent
         LOGGER.info("Sent {} segment refresh messages to brokers for segment: 
{} of table: {}", numMessagesSent,
@@ -3155,18 +3108,10 @@ public class PinotHelixResourceManager {
 
   /// Sends table config refresh message to brokers.
   private void sendTableConfigRefreshMessage(String tableNameWithType) {
-    TableConfigRefreshMessage tableConfigRefreshMessage = new 
TableConfigRefreshMessage(tableNameWithType);
-
-    // Send table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(tableNameWithType);
-    // Send message with no callback and infinite timeout on the recipient
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    TableConfigRefreshMessage message = new 
TableConfigRefreshMessage(tableNameWithType);
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
tableConfigRefreshMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
       LOGGER.info("Sent {} table config refresh messages to brokers for table: 
{}", numMessagesSent, tableNameWithType);
@@ -3177,43 +3122,21 @@ public class PinotHelixResourceManager {
 
   /// Sends table config and schema refresh message to servers.
   private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) {
-    TableConfigSchemaRefreshMessage refreshMessage = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
-
-    // Send table config and schema refresh message to servers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-
-    // Send message with no callback and infinite timeout on the recipient
-    try {
-      int numMessagesSent = 
_helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage, 
null, -1);
-      if (numMessagesSent > 0) {
-        LOGGER.info("Sent {} table config and schema refresh messages for 
table: {}", numMessagesSent,
-            tableNameWithType);
-      } else {
-        LOGGER.warn("No table config and schema refresh message sent for 
table: {}", tableNameWithType);
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while sending table config and schema 
refresh message for table: {}",
-          tableNameWithType, e);
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    TableConfigSchemaRefreshMessage message = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
+    if (numMessagesSent > 0) {
+      LOGGER.info("Sent {} table config and schema refresh messages for table: 
{}", numMessagesSent, tableNameWithType);
+    } else {
+      LOGGER.warn("No table config and schema refresh message sent for table: 
{}", tableNameWithType);
     }
   }
 
   private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
-    LogicalTableConfigRefreshMessage refreshMessage = new 
LogicalTableConfigRefreshMessage(logicalTableName);
-
-    // Send logical table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(logicalTableName);
-    // Send message with no callback and infinite timeout on the recipient
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    LogicalTableConfigRefreshMessage message = new 
LogicalTableConfigRefreshMessage(logicalTableName);
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
refreshMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, logicalTableName, null);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} logical table config refresh messages to brokers 
for table: {}", numMessagesSent,
           logicalTableName);
@@ -3223,18 +3146,11 @@ public class PinotHelixResourceManager {
   }
 
   private void sendApplicationQpsQuotaRefreshMessage(String appName) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     ApplicationQpsQuotaRefreshMessage message = new 
ApplicationQpsQuotaRefreshMessage(appName);
-
-    // Send database config refresh message to brokers
-    Criteria criteria = new Criteria();
-    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    criteria.setInstanceName("%");
-    criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    criteria.setSessionSpecific(true);
-
-    int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, 
message, null, INFINITE_TIMEOUT);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, Helix.BROKER_RESOURCE_INSTANCE);
     if (numMessagesSent > 0) {
-      LOGGER.info("Sent {} applcation qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
+      LOGGER.info("Sent {} application qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
           appName);
     } else {
       LOGGER.warn("No application qps quota refresh message sent to brokers 
for application: {}", appName);
@@ -3242,17 +3158,9 @@ public class PinotHelixResourceManager {
   }
 
   private void sendDatabaseConfigRefreshMessage(String databaseName) {
-    DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new 
DatabaseConfigRefreshMessage(databaseName);
-
-    // Send database config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    // Send message with no callback and infinite timeout on the recipient
-    int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
databaseConfigRefreshMessage, null, -1);
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    DatabaseConfigRefreshMessage message = new 
DatabaseConfigRefreshMessage(databaseName);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, Helix.BROKER_RESOURCE_INSTANCE);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} database config refresh messages to brokers for 
database: {}", numMessagesSent,
           databaseName);
@@ -3262,18 +3170,10 @@ public class PinotHelixResourceManager {
   }
 
   private void sendRoutingTableRebuildMessage(String tableNameWithType) {
-    RoutingTableRebuildMessage routingTableRebuildMessage = new 
RoutingTableRebuildMessage(tableNameWithType);
-
-    // Send routing table rebuild message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(tableNameWithType);
-    // Send message with no callback and infinite timeout on the recipient
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    RoutingTableRebuildMessage message = new 
RoutingTableRebuildMessage(tableNameWithType);
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
routingTableRebuildMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
       LOGGER.info("Sent {} routing table rebuild messages to brokers for 
table: {}", numMessagesSent,
@@ -4758,27 +4658,17 @@ public class PinotHelixResourceManager {
   public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String 
tableName, String periodicTaskName,
       Map<String, String> taskProperties) {
     String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0, 8);
-
     LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all 
controllers for running task {} against {},"
             + " with properties {}.\"", periodicTaskRequestId, 
periodicTaskName,
         tableName != null ? " table '" + tableName + "'" : "all tables", 
taskProperties);
-
-    // Create and send message to send to all controllers (including this one)
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
-    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
-    recipientCriteria.setSelfExcluded(false);
-    RunPeriodicTaskMessage runPeriodicTaskMessage =
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    RunPeriodicTaskMessage message =
         new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, 
tableName, taskProperties);
-
-    ClusterMessagingService clusterMessagingService = 
getHelixZkManager().getMessagingService();
-    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
-
+    int numMessagesSent =
+        MessagingServiceUtils.sendIncludingSelf(messagingService, message, 
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
     LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId,
-        messageCount);
-    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, 
messageCount > 0);
+        numMessagesSent);
+    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, 
numMessagesSent > 0);
   }
 
   /**
@@ -4824,13 +4714,13 @@ public class PinotHelixResourceManager {
   }
 
   public void sendQueryWorkloadRefreshMessage(Map<String, 
QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     instanceToRefreshMessageMap.forEach((instance, message) -> {
       Criteria criteria = new Criteria();
       criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
       criteria.setInstanceName(instance);
       criteria.setSessionSpecific(true);
-
-      int numMessagesSent = 
_helixZkManager.getMessagingService().send(criteria, message, null, -1);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, criteria);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent {} query workload config refresh messages to 
instance: {}", numMessagesSent, instance);
       } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 012ef8d2d5b..f7bcd221d0a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -55,10 +55,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -99,6 +97,7 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -1410,16 +1409,10 @@ public class PinotLLCRealtimeSegmentManager {
         newConsumingSegment, newInstances, realtimeTableName, 
instancesNoLongerServe);
 
     ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
+    IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
     List<String> instancesSent = new 
ArrayList<>(instancesNoLongerServe.size());
     for (String instance : instancesNoLongerServe) {
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName(instance);
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(realtimeTableName);
-      recipientCriteria.setPartition(committedSegment);
-      recipientCriteria.setSessionSpecific(true);
-      IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
-      if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+      if (MessagingServiceUtils.send(messagingService, message, 
realtimeTableName, committedSegment, instance) > 0) {
         instancesSent.add(instance);
       } else {
         LOGGER.warn("Failed to send ingestion metrics remove message for 
table: {} segment: {} to instance: {}",
@@ -2358,20 +2351,14 @@ public class PinotLLCRealtimeSegmentManager {
 
   private void sendForceCommitMessageToServers(String tableNameWithType, 
Set<String> consumingSegments) {
     if (!consumingSegments.isEmpty()) {
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName("%");
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
+      LOGGER.info("Sending force commit messages for segments: {} of table: 
{}", consumingSegments, tableNameWithType);
+      ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
       ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, 
consumingSegments);
-      int numMessagesSent = 
_helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
       if (numMessagesSent > 0) {
-        LOGGER.info("Sent {} force commit messages for table: {} segments: 
{}", numMessagesSent, tableNameWithType,
-            consumingSegments);
+        LOGGER.info("Sent {} force commit messages for table: {}", 
numMessagesSent, tableNameWithType);
       } else {
-        throw new RuntimeException(
-            String.format("No force commit message was sent for table: %s 
segments: %s", tableNameWithType,
-                consumingSegments));
+        throw new IllegalStateException("No force commit message sent for 
table: " + tableNameWithType);
       }
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index fd10148ce3e..729b0c49c72 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -33,8 +34,6 @@ import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -47,6 +46,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -319,18 +319,11 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
       Map<String, Set<String>> serverToSegmentsToMigrate, 
ClusterMessagingService messagingService) {
     for (Map.Entry<String, Set<String>> entry : 
serverToSegmentsToMigrate.entrySet()) {
       String serverName = entry.getKey();
-      Set<String> segmentNames = entry.getValue();
-      // One SegmentReloadMessage per server but takes all segment names.
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(serverName);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
-      SegmentReloadMessage segmentReloadMessage =
-          new SegmentReloadMessage(tableNameWithType, new 
ArrayList<>(segmentNames), false);
+      List<String> segments = new ArrayList<>(entry.getValue());
       LOGGER.info("Sending SegmentReloadMessage to server: {} to reload 
segments: {} of table: {}", serverName,
-          segmentNames, tableNameWithType);
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, -1);
+          segments, tableNameWithType);
+      SegmentReloadMessage message = new 
SegmentReloadMessage(tableNameWithType, segments, false);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType, null, serverName);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", 
serverName, tableNameWithType);
       } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
new file mode 100644
index 00000000000..88bf6f43433
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import javax.annotation.Nullable;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+
+public class MessagingServiceUtils {
+  private MessagingServiceUtils() {
+  }
+
+  /// Sends a message to the recipients specified by the criteria, returns the 
number of messages being sent.
+  public static int send(ClusterMessagingService messagingService, Message 
message, Criteria criteria) {
+    try {
+      return messagingService.send(criteria, message);
+    } catch (Exception e) {
+      // NOTE:
+      // It can throw exception when the target resource doesn't exist (e.g. 
ExternalView has not been created yet). It
+      // is normal case, and we count it as no message being sent.
+      return 0;
+    }
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource,
+      @Nullable String partition, @Nullable String instanceName, boolean 
includingSelf) {
+    Criteria criteria = new Criteria();
+    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    criteria.setSessionSpecific(true);
+    criteria.setResource(resource);
+    if (partition != null) {
+      criteria.setPartition(partition);
+    }
+    if (instanceName != null) {
+      criteria.setInstanceName(instanceName);
+    } else {
+      criteria.setInstanceName("%");
+    }
+    criteria.setSelfExcluded(!includingSelf);
+    return send(messagingService, message, criteria);
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource,
+      @Nullable String partition, @Nullable String instanceName) {
+    return send(messagingService, message, resource, partition, instanceName, 
false);
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource) {
+    return send(messagingService, message, resource, null, null, false);
+  }
+
+  public static int sendIncludingSelf(ClusterMessagingService 
messagingService, Message message, String resource) {
+    return send(messagingService, message, resource, null, null, true);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index e6bbcdd026a..6d058effdb6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -44,7 +44,6 @@ import org.apache.pinot.util.TestUtils;
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -94,8 +93,7 @@ public class SegmentRelocatorTest {
 
     ArgumentCaptor<Criteria> criteriaCapture = 
ArgumentCaptor.forClass(Criteria.class);
     ArgumentCaptor<SegmentReloadMessage> reloadMessageCapture = 
ArgumentCaptor.forClass(SegmentReloadMessage.class);
-    verify(messagingService, times(2)).send(criteriaCapture.capture(), 
reloadMessageCapture.capture(), eq(null),
-        eq(-1));
+    verify(messagingService, times(2)).send(criteriaCapture.capture(), 
reloadMessageCapture.capture());
 
     List<Criteria> criteriaList = criteriaCapture.getAllValues();
     List<SegmentReloadMessage> msgList = reloadMessageCapture.getAllValues();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to