This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a3bed7c06d Fix the server admin endpoint cache to reflect the config changes (#9734) a3bed7c06d is described below commit a3bed7c06d06786377ec7ad6d84e9f769cb8f3dd Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Nov 4 20:01:02 2022 -0700 Fix the server admin endpoint cache to reflect the config changes (#9734) --- .../helix/core/PinotHelixResourceManager.java | 110 ++++++++++++--------- .../PinotHelixResourceManagerStatelessTest.java | 50 ++++++++-- 2 files changed, 103 insertions(+), 57 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 86f7b896f9..3e7ed24d5d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -34,7 +34,6 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,7 +45,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -68,9 +66,13 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.api.listeners.BatchMode; +import org.apache.helix.api.listeners.InstanceConfigChangeListener; +import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; @@ -260,6 +262,30 @@ public class PinotHelixResourceManager { _deletedSegmentsRetentionInDays); ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster); + // Add listener on instance config changes to invalidate _instanceAdminEndpointCache + try { + helixZkManager.addInstanceConfigChangeListener(new InstanceConfigChangeListener() { + @BatchMode(enabled = false) + @PreFetch(enabled = false) + @Override + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { + NotificationContext.Type type = context.getType(); + if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.FINALIZE + || context.getIsChildChange()) { + // Invalid all entries when the change is not within the instance config (e.g. set up the listener, add or + // delete an instance config) + _instanceAdminEndpointCache.invalidateAll(); + } else { + String pathChanged = context.getPathChanged(); + String instanceName = pathChanged.substring(pathChanged.lastIndexOf('/') + 1); + _instanceAdminEndpointCache.invalidate(instanceName); + } + } + }); + } catch (Exception e) { + throw new RuntimeException("Caught exception while adding InstanceConfigChangeListener"); + } + // Initialize TableCache HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build(); @@ -1336,7 +1362,7 @@ public class PinotHelixResourceManager { } else { // TODO: Add the reason of the incompatibility throw new SchemaBackwardIncompatibleException( - String.format("New schema: %s is not backward-compatible with the existing schema", schemaName)); + String.format("New schema: %s is not backward-compatible with the existing schema", schemaName)); } } ZKMetadataProvider.setSchema(_propertyStore, schema); @@ -1706,18 +1732,18 @@ public class PinotHelixResourceManager { InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs(); for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) { - boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, - instancePartitionsType); + boolean hasPreConfiguredInstancePartitions = + TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType); InstancePartitions instancePartitions; if (!hasPreConfiguredInstancePartitions) { instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null); LOGGER.info("Persisting instance partitions: {}", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); } else { - String referenceInstancePartitionsName = - tableConfig.getInstancePartitionsMap().get(instancePartitionsType); - instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, - referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); + String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); + instancePartitions = + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, referenceInstancePartitionsName, + instancePartitionsType.getInstancePartitionsName(rawTableName)); LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, referenceInstancePartitionsName); InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); @@ -1919,7 +1945,6 @@ public class PinotHelixResourceManager { // Remove all stored segments for the table - // Remove all stored segments for the table Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false), @@ -2037,8 +2062,8 @@ public class PinotHelixResourceManager { ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1); Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields(); return controllerJobs.entrySet().stream().filter( - job -> job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE) - .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + job -> job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } catch (ZkNoNodeException e) { LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e); } @@ -2060,10 +2085,8 @@ public class PinotHelixResourceManager { jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, - Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, - Integer.toString(numMessagesSent)); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent)); jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); return addReloadJobToZK(jobId, jobMetadata); } @@ -2079,12 +2102,9 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, - ControllerJobType.RELOAD_ALL_SEGMENTS.toString()); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, - Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, - Integer.toString(numberOfMessagesSent)); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_ALL_SEGMENTS.toString()); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); return addReloadJobToZK(jobId, jobMetadata); } @@ -2096,14 +2116,10 @@ public class PinotHelixResourceManager { Map<String, Map<String, String>> tasks = tableJobsZnRecord.getMapFields(); tasks.put(jobId, jobMetadata); if (tasks.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) { - tasks = tasks.entrySet().stream().sorted(new Comparator<Map.Entry<String, Map<String, String>>>() { - @Override - public int compare(Map.Entry<String, Map<String, String>> v1, Map.Entry<String, Map<String, String>> v2) { - return Long.compare( - Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)), - Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))); - } - }).collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) + tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare( + Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)), + Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)))) + .collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } tableJobsZnRecord.setMapFields(tasks); @@ -2281,8 +2297,7 @@ public class PinotHelixResourceManager { } } - public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata, - DateTimeFieldSpec timeColumnFieldSpec) { + public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata, DateTimeFieldSpec timeColumnFieldSpec) { ZKMetadataUtils.updateSegmentZKTimeInterval(segmentZKMetadata, timeColumnFieldSpec); } @@ -2432,8 +2447,8 @@ public class PinotHelixResourceManager { private static Set<String> parseInstanceSet(IdealState idealState, String segmentName, @Nullable String targetInstance) { Set<String> instanceSet = idealState.getInstanceSet(segmentName); - Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), - "Could not find segment: %s in ideal state", segmentName); + Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state", + segmentName); if (targetInstance != null) { return instanceSet.contains(targetInstance) ? Collections.singleton(targetInstance) : Collections.emptySet(); } else { @@ -2445,11 +2460,9 @@ public class PinotHelixResourceManager { * This util is similar to {@link HelixAdmin#resetPartition(String, String, String, List)}. * However instead of resetting only the ERROR state to its initial state. we reset all state regardless. */ - private void resetPartitionAllState(String instanceName, String resourceName, - Set<String> resetPartitionNames) { + private void resetPartitionAllState(String instanceName, String resourceName, Set<String> resetPartitionNames) { LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", - resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, - instanceName, _helixClusterName); + resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, instanceName, _helixClusterName); HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -2470,20 +2483,19 @@ public class PinotHelixResourceManager { // get current state. String sessionId = liveInstance.getEphemeralOwner(); - CurrentState curState = - accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); + CurrentState curState = accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); // check there is no pending messages for the partitions exist List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); for (Message message : messages) { - if (!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId - .equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) + if (!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId.equals( + message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) || !resetPartitionNames.contains(message.getPartitionName())) { continue; } - throw new RuntimeException(String.format("Can't reset state for %s.%s on %s, " - + "because a pending message %s exists for resource %s", resourceName, resetPartitionNames, instanceName, - message.toString(), message.getResourceName())); + throw new RuntimeException( + String.format("Can't reset state for %s.%s on %s, because a pending message %s exists for resource %s", + resourceName, resetPartitionNames, instanceName, message, message.getResourceName())); } String adminName = null; @@ -2495,8 +2507,8 @@ public class PinotHelixResourceManager { adminName = "UNKNOWN"; } - List<Message> resetMessages = new ArrayList<Message>(); - List<PropertyKey> messageKeys = new ArrayList<PropertyKey>(); + List<Message> resetMessages = new ArrayList<>(); + List<PropertyKey> messageKeys = new ArrayList<>(); for (String partitionName : resetPartitionNames) { // send currentState to initialState message String msgId = UUID.randomUUID().toString(); @@ -3113,9 +3125,9 @@ public class PinotHelixResourceManager { String instanceAdminEndpoint; try { instanceAdminEndpoint = _instanceAdminEndpointCache.get(instance); - } catch (ExecutionException e) { + } catch (Exception e) { String errorMessage = - String.format("ExecutionException when getting instance admin endpoint for instance: %s. Error message: %s", + String.format("Caught exception while getting instance admin endpoint for instance: %s. Error message: %s", instance, e.getMessage()); LOGGER.error(errorMessage, e); throw new InvalidConfigException(errorMessage); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 6d81f4d064..5e9def8a56 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -39,6 +39,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.common.lineage.LineageEntryState; import org.apache.pinot.common.lineage.SegmentLineage; @@ -143,24 +144,57 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { } @Test - public void testGetInstanceEndpoints() + public void testGetDataInstanceAdminEndpoints() throws Exception { Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME); - BiMap<String, String> endpoints = _helixResourceManager.getDataInstanceAdminEndpoints(servers); - // Check that we have endpoints for all instances. - assertEquals(endpoints.size(), NUM_SERVER_INSTANCES); - - // Check actual endpoint names - for (Map.Entry<String, String> entry : endpoints.entrySet()) { + BiMap<String, String> adminEndpoints = _helixResourceManager.getDataInstanceAdminEndpoints(servers); + assertEquals(adminEndpoints.size(), NUM_SERVER_INSTANCES); + for (Map.Entry<String, String> entry : adminEndpoints.entrySet()) { String key = entry.getKey(); int port = Server.DEFAULT_ADMIN_API_PORT + Integer.parseInt(key.substring("Server_localhost_".length())); assertEquals(entry.getValue(), "http://localhost:" + port); } + + // Add a new server + String serverName = "Server_localhost_" + NUM_SERVER_INSTANCES; + Instance instance = new Instance("localhost", NUM_SERVER_INSTANCES, InstanceType.SERVER, + Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE), null, 0, 12345, 0, 0, false); + _helixResourceManager.addInstance(instance, false); + adminEndpoints = _helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName)); + assertEquals(adminEndpoints.size(), 1); + assertEquals(adminEndpoints.get(serverName), "http://localhost:12345"); + + // Modify the admin port for the new added server + instance = new Instance("localhost", NUM_SERVER_INSTANCES, InstanceType.SERVER, + Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE), null, 0, 23456, 0, 0, false); + _helixResourceManager.updateInstance(serverName, instance, false); + // Admin endpoint is updated through the instance config change callback, which happens asynchronously + TestUtils.waitForCondition(aVoid -> { + try { + BiMap<String, String> endpoints = + _helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName)); + assertEquals(endpoints.size(), 1); + return endpoints.get(serverName).equals("http://localhost:23456"); + } catch (InvalidConfigException e) { + throw new RuntimeException(e); + } + }, 60_000L, "Failed to update the admin port"); + + // Remove the new added server + _helixResourceManager.dropInstance(serverName); + TestUtils.waitForCondition(aVoid -> { + try { + _helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName)); + return false; + } catch (InvalidConfigException e) { + return true; + } + }, 60_000L, "Failed to remove the admin endpoint"); } @Test - public void testGetInstanceConfigs() { + public void testAddRemoveInstance() { Set<String> serverInstances = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME); for (String instanceName : serverInstances) { InstanceConfig instanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org