This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7a585c9ed90 Fix Helix message exception when EV doesn't exist (#16133) 7a585c9ed90 is described below commit 7a585c9ed908b6e6c32e89db8b0049e3daf52667 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Jun 17 17:20:46 2025 -0600 Fix Helix message exception when EV doesn't exist (#16133) --- .../helix/core/PinotHelixResourceManager.java | 226 ++++++--------------- .../realtime/PinotLLCRealtimeSegmentManager.java | 29 +-- .../helix/core/relocation/SegmentRelocator.java | 19 +- .../helix/core/util/MessagingServiceUtils.java | 74 +++++++ .../core/relocation/SegmentRelocatorTest.java | 4 +- 5 files changed, 147 insertions(+), 205 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1d01e23525b..8b474d2e987 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -155,6 +155,7 @@ import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; +import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.controller.workload.QueryWorkloadManager; import org.apache.pinot.segment.spi.SegmentMetadata; @@ -199,10 +200,8 @@ public class PinotHelixResourceManager { private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f); private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5; public static final String APPEND = "APPEND"; - private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500; private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500; private static final String API_REQUEST_ID_PREFIX = "api-"; - private static final int INFINITE_TIMEOUT = -1; private enum LineageUpdateType { START, END, REVERT @@ -211,8 +210,6 @@ public class PinotHelixResourceManager { // TODO: make this configurable public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second - public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 minutes - public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 second private static final DateTimeFormatter SIMPLE_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC); @@ -2759,24 +2756,10 @@ public class PinotHelixResourceManager { * Delete the table on servers by sending table deletion messages. */ private void deleteTableOnServers(String tableNameWithType) { - // External view can be null for newly created table, skip sending messages - if (_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) == null) { - LOGGER.warn("No delete table message sent for newly created table: {} without external view", tableNameWithType); - return; - } - LOGGER.info("Sending delete table messages for table: {}", tableNameWithType); - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - - // Infinite timeout on the recipient - int timeoutMs = -1; - int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs); + TableDeletionMessage message = new TableDeletionMessage(tableNameWithType); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType); if (numMessagesSent > 0) { LOGGER.info("Sent {} delete table messages for table: {}", numMessagesSent, tableNameWithType); } else { @@ -2821,27 +2804,21 @@ public class PinotHelixResourceManager { Preconditions.checkArgument(tt == TableType.OFFLINE, "Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType); } - // Infinite timeout on the recipient - int timeoutMs = -1; + + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>(); for (Map.Entry<String, List<String>> entry : instanceToSegmentsMap.entrySet()) { String targetInstance = entry.getKey(); - List<String> segments = entry.getValue(); - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(targetInstance); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segments, forceDownload); - ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, message, tableNameWithType, null, targetInstance); if (numMessagesSent > 0) { LOGGER.info("Sent {} reload messages to instance: {} for table: {}", numMessagesSent, targetInstance, tableNameWithType); } else { LOGGER.warn("No reload message sent to instance: {} for table: {}", targetInstance, tableNameWithType); } - instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, segmentReloadMessage.getMsgId())); + instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, message.getMsgId())); } return instanceMsgInfoMap; } @@ -2858,24 +2835,17 @@ public class PinotHelixResourceManager { "Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType); } - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - - // Infinite timeout on the recipient - int timeoutMs = -1; - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, forceDownload); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, message, tableNameWithType, null, targetInstance); if (numMessagesSent > 0) { LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, tableNameWithType); } else { LOGGER.warn("No reload message sent for table: {}", tableNameWithType); } - return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId()); + return Pair.of(numMessagesSent, message.getMsgId()); } public Pair<Integer, String> reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload, @@ -2891,26 +2861,18 @@ public class PinotHelixResourceManager { segmentName); } - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setPartition(segmentName); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = - new SegmentReloadMessage(tableNameWithType, Collections.singletonList(segmentName), forceDownload); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - - // Infinite timeout on the recipient - int timeoutMs = -1; - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, List.of(segmentName), forceDownload); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, message, tableNameWithType, segmentName, targetInstance); if (numMessagesSent > 0) { LOGGER.info("Sent {} reload messages for segment: {} in table: {}", numMessagesSent, segmentName, tableNameWithType); } else { LOGGER.warn("No reload message sent for segment: {} in table: {}", segmentName, tableNameWithType); } - return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId()); + + return Pair.of(numMessagesSent, message.getMsgId()); } /** @@ -3023,8 +2985,8 @@ public class PinotHelixResourceManager { */ @VisibleForTesting void resetPartitionAllState(String instanceName, String resourceName, Set<String> resetPartitionNames) { - LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", - resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, instanceName, _helixClusterName); + LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}", resetPartitionNames, resourceName, + instanceName); HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -3060,7 +3022,7 @@ public class PinotHelixResourceManager { + message.getResourceName()); } - String adminName = null; + String adminName; try { adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN"; } catch (UnknownHostException e) { @@ -3112,21 +3074,12 @@ public class PinotHelixResourceManager { */ public void sendSegmentRefreshMessage(String tableNameWithType, String segmentName, boolean refreshServerSegment, boolean refreshBrokerRouting) { - SegmentRefreshMessage segmentRefreshMessage = new SegmentRefreshMessage(tableNameWithType, segmentName); - - // Send segment refresh message to servers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setSessionSpecific(true); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + SegmentRefreshMessage message = new SegmentRefreshMessage(tableNameWithType, segmentName); + // Send segment refresh message to servers if (refreshServerSegment) { - // Send segment refresh message to servers - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setPartition(segmentName); - // Send message with no callback and infinite timeout on the recipient - int numMessagesSent = messagingService.send(recipientCriteria, segmentRefreshMessage, null, -1); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType, segmentName, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} segment refresh messages to servers for segment: {} of table: {}", numMessagesSent, @@ -3137,11 +3090,11 @@ public class PinotHelixResourceManager { } } + // Send segment refresh message to brokers if (refreshBrokerRouting) { - // Send segment refresh message to brokers - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setPartition(tableNameWithType); - int numMessagesSent = messagingService.send(recipientCriteria, segmentRefreshMessage, null, -1); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, message, Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, + null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} segment refresh messages to brokers for segment: {} of table: {}", numMessagesSent, @@ -3155,18 +3108,10 @@ public class PinotHelixResourceManager { /// Sends table config refresh message to brokers. private void sendTableConfigRefreshMessage(String tableNameWithType) { - TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType); - - // Send table config refresh message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setPartition(tableNameWithType); - // Send message with no callback and infinite timeout on the recipient + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + TableConfigRefreshMessage message = new TableConfigRefreshMessage(tableNameWithType); int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, tableConfigRefreshMessage, null, -1); + MessagingServiceUtils.send(messagingService, message, Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", numMessagesSent, tableNameWithType); @@ -3177,43 +3122,21 @@ public class PinotHelixResourceManager { /// Sends table config and schema refresh message to servers. private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) { - TableConfigSchemaRefreshMessage refreshMessage = new TableConfigSchemaRefreshMessage(tableNameWithType); - - // Send table config and schema refresh message to servers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - - // Send message with no callback and infinite timeout on the recipient - try { - int numMessagesSent = _helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage, null, -1); - if (numMessagesSent > 0) { - LOGGER.info("Sent {} table config and schema refresh messages for table: {}", numMessagesSent, - tableNameWithType); - } else { - LOGGER.warn("No table config and schema refresh message sent for table: {}", tableNameWithType); - } - } catch (Exception e) { - LOGGER.warn("Caught exception while sending table config and schema refresh message for table: {}", - tableNameWithType, e); + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + TableConfigSchemaRefreshMessage message = new TableConfigSchemaRefreshMessage(tableNameWithType); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} table config and schema refresh messages for table: {}", numMessagesSent, tableNameWithType); + } else { + LOGGER.warn("No table config and schema refresh message sent for table: {}", tableNameWithType); } } private void sendLogicalTableConfigRefreshMessage(String logicalTableName) { - LogicalTableConfigRefreshMessage refreshMessage = new LogicalTableConfigRefreshMessage(logicalTableName); - - // Send logical table config refresh message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setPartition(logicalTableName); - // Send message with no callback and infinite timeout on the recipient + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + LogicalTableConfigRefreshMessage message = new LogicalTableConfigRefreshMessage(logicalTableName); int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage, null, -1); + MessagingServiceUtils.send(messagingService, message, Helix.BROKER_RESOURCE_INSTANCE, logicalTableName, null); if (numMessagesSent > 0) { LOGGER.info("Sent {} logical table config refresh messages to brokers for table: {}", numMessagesSent, logicalTableName); @@ -3223,18 +3146,11 @@ public class PinotHelixResourceManager { } private void sendApplicationQpsQuotaRefreshMessage(String appName) { + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); ApplicationQpsQuotaRefreshMessage message = new ApplicationQpsQuotaRefreshMessage(appName); - - // Send database config refresh message to brokers - Criteria criteria = new Criteria(); - criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - criteria.setInstanceName("%"); - criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - criteria.setSessionSpecific(true); - - int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, message, null, INFINITE_TIMEOUT); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, Helix.BROKER_RESOURCE_INSTANCE); if (numMessagesSent > 0) { - LOGGER.info("Sent {} applcation qps quota refresh messages to brokers for application: {}", numMessagesSent, + LOGGER.info("Sent {} application qps quota refresh messages to brokers for application: {}", numMessagesSent, appName); } else { LOGGER.warn("No application qps quota refresh message sent to brokers for application: {}", appName); @@ -3242,17 +3158,9 @@ public class PinotHelixResourceManager { } private void sendDatabaseConfigRefreshMessage(String databaseName) { - DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new DatabaseConfigRefreshMessage(databaseName); - - // Send database config refresh message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - // Send message with no callback and infinite timeout on the recipient - int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, databaseConfigRefreshMessage, null, -1); + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + DatabaseConfigRefreshMessage message = new DatabaseConfigRefreshMessage(databaseName); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, Helix.BROKER_RESOURCE_INSTANCE); if (numMessagesSent > 0) { LOGGER.info("Sent {} database config refresh messages to brokers for database: {}", numMessagesSent, databaseName); @@ -3262,18 +3170,10 @@ public class PinotHelixResourceManager { } private void sendRoutingTableRebuildMessage(String tableNameWithType) { - RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType); - - // Send routing table rebuild message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setPartition(tableNameWithType); - // Send message with no callback and infinite timeout on the recipient + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + RoutingTableRebuildMessage message = new RoutingTableRebuildMessage(tableNameWithType); int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1); + MessagingServiceUtils.send(messagingService, message, Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, @@ -4758,27 +4658,17 @@ public class PinotHelixResourceManager { public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableName, String periodicTaskName, Map<String, String> taskProperties) { String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8); - LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {}," + " with properties {}.\"", periodicTaskRequestId, periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables", taskProperties); - - // Create and send message to send to all controllers (including this one) - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); - recipientCriteria.setSelfExcluded(false); - RunPeriodicTaskMessage runPeriodicTaskMessage = + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + RunPeriodicTaskMessage message = new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName, taskProperties); - - ClusterMessagingService clusterMessagingService = getHelixZkManager().getMessagingService(); - int messageCount = clusterMessagingService.send(recipientCriteria, runPeriodicTaskMessage, null, -1); - + int numMessagesSent = + MessagingServiceUtils.sendIncludingSelf(messagingService, message, Helix.LEAD_CONTROLLER_RESOURCE_NAME); LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId, - messageCount); - return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0); + numMessagesSent); + return new PeriodicTaskInvocationResponse(periodicTaskRequestId, numMessagesSent > 0); } /** @@ -4824,13 +4714,13 @@ public class PinotHelixResourceManager { } public void sendQueryWorkloadRefreshMessage(Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) { + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); instanceToRefreshMessageMap.forEach((instance, message) -> { Criteria criteria = new Criteria(); criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); criteria.setInstanceName(instance); criteria.setSessionSpecific(true); - - int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, message, null, -1); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, criteria); if (numMessagesSent > 0) { LOGGER.info("Sent {} query workload config refresh messages to instance: {}", numMessagesSent, instance); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 012ef8d2d5b..f7bcd221d0a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -55,10 +55,8 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; import org.apache.helix.ClusterMessagingService; -import org.apache.helix.Criteria; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -99,6 +97,7 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy; import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy; +import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils; import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.core.util.PeerServerSegmentFinder; @@ -1410,16 +1409,10 @@ public class PinotLLCRealtimeSegmentManager { newConsumingSegment, newInstances, realtimeTableName, instancesNoLongerServe); ClusterMessagingService messagingService = _helixManager.getMessagingService(); + IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage(); List<String> instancesSent = new ArrayList<>(instancesNoLongerServe.size()); for (String instance : instancesNoLongerServe) { - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName(instance); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource(realtimeTableName); - recipientCriteria.setPartition(committedSegment); - recipientCriteria.setSessionSpecific(true); - IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage(); - if (messagingService.send(recipientCriteria, message, null, -1) > 0) { + if (MessagingServiceUtils.send(messagingService, message, realtimeTableName, committedSegment, instance) > 0) { instancesSent.add(instance); } else { LOGGER.warn("Failed to send ingestion metrics remove message for table: {} segment: {} to instance: {}", @@ -2358,20 +2351,14 @@ public class PinotLLCRealtimeSegmentManager { private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) { if (!consumingSegments.isEmpty()) { - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); + LOGGER.info("Sending force commit messages for segments: {} of table: {}", consumingSegments, tableNameWithType); + ClusterMessagingService messagingService = _helixManager.getMessagingService(); ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments); - int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType); if (numMessagesSent > 0) { - LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType, - consumingSegments); + LOGGER.info("Sent {} force commit messages for table: {}", numMessagesSent, tableNameWithType); } else { - throw new RuntimeException( - String.format("No force commit message was sent for table: %s segments: %s", tableNameWithType, - consumingSegments)); + throw new IllegalStateException("No force commit message sent for table: " + tableNameWithType); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index fd10148ce3e..729b0c49c72 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -33,8 +34,6 @@ import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.helix.ClusterMessagingService; -import org.apache.helix.Criteria; -import org.apache.helix.InstanceType; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -47,6 +46,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; +import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils; import org.apache.pinot.controller.util.TableTierReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -319,18 +319,11 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { Map<String, Set<String>> serverToSegmentsToMigrate, ClusterMessagingService messagingService) { for (Map.Entry<String, Set<String>> entry : serverToSegmentsToMigrate.entrySet()) { String serverName = entry.getKey(); - Set<String> segmentNames = entry.getValue(); - // One SegmentReloadMessage per server but takes all segment names. - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(serverName); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = - new SegmentReloadMessage(tableNameWithType, new ArrayList<>(segmentNames), false); + List<String> segments = new ArrayList<>(entry.getValue()); LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", serverName, - segmentNames, tableNameWithType); - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, -1); + segments, tableNameWithType); + SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, segments, false); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType, null, serverName); if (numMessagesSent > 0) { LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", serverName, tableNameWithType); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java new file mode 100644 index 00000000000..88bf6f43433 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.util; + +import javax.annotation.Nullable; +import org.apache.helix.ClusterMessagingService; +import org.apache.helix.Criteria; +import org.apache.helix.InstanceType; +import org.apache.helix.model.Message; + + +public class MessagingServiceUtils { + private MessagingServiceUtils() { + } + + /// Sends a message to the recipients specified by the criteria, returns the number of messages being sent. + public static int send(ClusterMessagingService messagingService, Message message, Criteria criteria) { + try { + return messagingService.send(criteria, message); + } catch (Exception e) { + // NOTE: + // It can throw exception when the target resource doesn't exist (e.g. ExternalView has not been created yet). It + // is normal case, and we count it as no message being sent. + return 0; + } + } + + public static int send(ClusterMessagingService messagingService, Message message, String resource, + @Nullable String partition, @Nullable String instanceName, boolean includingSelf) { + Criteria criteria = new Criteria(); + criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + criteria.setSessionSpecific(true); + criteria.setResource(resource); + if (partition != null) { + criteria.setPartition(partition); + } + if (instanceName != null) { + criteria.setInstanceName(instanceName); + } else { + criteria.setInstanceName("%"); + } + criteria.setSelfExcluded(!includingSelf); + return send(messagingService, message, criteria); + } + + public static int send(ClusterMessagingService messagingService, Message message, String resource, + @Nullable String partition, @Nullable String instanceName) { + return send(messagingService, message, resource, partition, instanceName, false); + } + + public static int send(ClusterMessagingService messagingService, Message message, String resource) { + return send(messagingService, message, resource, null, null, false); + } + + public static int sendIncludingSelf(ClusterMessagingService messagingService, Message message, String resource) { + return send(messagingService, message, resource, null, null, true); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java index e6bbcdd026a..6d058effdb6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java @@ -44,7 +44,6 @@ import org.apache.pinot.util.TestUtils; import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -94,8 +93,7 @@ public class SegmentRelocatorTest { ArgumentCaptor<Criteria> criteriaCapture = ArgumentCaptor.forClass(Criteria.class); ArgumentCaptor<SegmentReloadMessage> reloadMessageCapture = ArgumentCaptor.forClass(SegmentReloadMessage.class); - verify(messagingService, times(2)).send(criteriaCapture.capture(), reloadMessageCapture.capture(), eq(null), - eq(-1)); + verify(messagingService, times(2)).send(criteriaCapture.capture(), reloadMessageCapture.capture()); List<Criteria> criteriaList = criteriaCapture.getAllValues(); List<SegmentReloadMessage> msgList = reloadMessageCapture.getAllValues(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org