This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a3bed7c06d Fix the server admin endpoint cache to reflect the config 
changes (#9734)
a3bed7c06d is described below

commit a3bed7c06d06786377ec7ad6d84e9f769cb8f3dd
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Nov 4 20:01:02 2022 -0700

    Fix the server admin endpoint cache to reflect the config changes (#9734)
---
 .../helix/core/PinotHelixResourceManager.java      | 110 ++++++++++++---------
 .../PinotHelixResourceManagerStatelessTest.java    |  50 ++++++++--
 2 files changed, 103 insertions(+), 57 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 86f7b896f9..3e7ed24d5d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -34,7 +34,6 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,7 +45,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
@@ -68,9 +66,13 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.api.listeners.BatchMode;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
@@ -260,6 +262,30 @@ public class PinotHelixResourceManager {
         _deletedSegmentsRetentionInDays);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, 
_isSingleTenantCluster);
 
+    // Add listener on instance config changes to invalidate 
_instanceAdminEndpointCache
+    try {
+      helixZkManager.addInstanceConfigChangeListener(new 
InstanceConfigChangeListener() {
+        @BatchMode(enabled = false)
+        @PreFetch(enabled = false)
+        @Override
+        public void onInstanceConfigChange(List<InstanceConfig> 
instanceConfigs, NotificationContext context) {
+          NotificationContext.Type type = context.getType();
+          if (type == NotificationContext.Type.INIT || type == 
NotificationContext.Type.FINALIZE
+              || context.getIsChildChange()) {
+            // Invalid all entries when the change is not within the instance 
config (e.g. set up the listener, add or
+            // delete an instance config)
+            _instanceAdminEndpointCache.invalidateAll();
+          } else {
+            String pathChanged = context.getPathChanged();
+            String instanceName = 
pathChanged.substring(pathChanged.lastIndexOf('/') + 1);
+            _instanceAdminEndpointCache.invalidate(instanceName);
+          }
+        }
+      });
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while adding 
InstanceConfigChangeListener");
+    }
+
     // Initialize TableCache
     HelixConfigScope helixConfigScope =
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build();
@@ -1336,7 +1362,7 @@ public class PinotHelixResourceManager {
       } else {
         // TODO: Add the reason of the incompatibility
         throw new SchemaBackwardIncompatibleException(
-                String.format("New schema: %s is not backward-compatible with 
the existing schema", schemaName));
+            String.format("New schema: %s is not backward-compatible with the 
existing schema", schemaName));
       }
     }
     ZKMetadataProvider.setSchema(_propertyStore, schema);
@@ -1706,18 +1732,18 @@ public class PinotHelixResourceManager {
       InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
       List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : 
instancePartitionsTypesToAssign) {
-        boolean hasPreConfiguredInstancePartitions = 
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
-            instancePartitionsType);
+        boolean hasPreConfiguredInstancePartitions =
+            TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType);
         InstancePartitions instancePartitions;
         if (!hasPreConfiguredInstancePartitions) {
           instancePartitions = 
instanceAssignmentDriver.assignInstances(instancePartitionsType, 
instanceConfigs, null);
           LOGGER.info("Persisting instance partitions: {}", 
instancePartitions);
           InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
         } else {
-          String referenceInstancePartitionsName =
-              
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
-          instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
-              referenceInstancePartitionsName, 
instancePartitionsType.getInstancePartitionsName(rawTableName));
+          String referenceInstancePartitionsName = 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
+          instancePartitions =
+              
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, 
referenceInstancePartitionsName,
+                  
instancePartitionsType.getInstancePartitionsName(rawTableName));
           LOGGER.info("Persisting instance partitions: {} (referencing {})", 
instancePartitions,
               referenceInstancePartitionsName);
           InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
@@ -1919,7 +1945,6 @@ public class PinotHelixResourceManager {
 
     // Remove all stored segments for the table
 
-
     // Remove all stored segments for the table
     Long retentionPeriodMs = retentionPeriod != null ? 
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
     _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, 
getSegmentsFor(realtimeTableName, false),
@@ -2037,8 +2062,8 @@ public class PinotHelixResourceManager {
       ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
       Map<String, Map<String, String>> controllerJobs = 
tableJobsRecord.getMapFields();
       return controllerJobs.entrySet().stream().filter(
-          job -> 
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
-              
.equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+              job -> 
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType))
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     } catch (ZkNoNodeException e) {
       LOGGER.warn("Could not find controller job node for table : {}", 
tableNameWithType, e);
     }
@@ -2060,10 +2085,8 @@ public class PinotHelixResourceManager {
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
-    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
-        Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
-        Integer.toString(numMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numMessagesSent));
     
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
     return addReloadJobToZK(jobId, jobMetadata);
   }
@@ -2079,12 +2102,9 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
-        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
-    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
-        Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
-        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
     return addReloadJobToZK(jobId, jobMetadata);
   }
 
@@ -2096,14 +2116,10 @@ public class PinotHelixResourceManager {
       Map<String, Map<String, String>> tasks = 
tableJobsZnRecord.getMapFields();
       tasks.put(jobId, jobMetadata);
       if (tasks.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
-        tasks = tasks.entrySet().stream().sorted(new 
Comparator<Map.Entry<String, Map<String, String>>>() {
-              @Override
-              public int compare(Map.Entry<String, Map<String, String>> v1, 
Map.Entry<String, Map<String, String>> v2) {
-                return Long.compare(
-                    
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
-                    
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)));
-              }
-            }).collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
+        tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
+                
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+                
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+            .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
             .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
       }
       tableJobsZnRecord.setMapFields(tasks);
@@ -2281,8 +2297,7 @@ public class PinotHelixResourceManager {
     }
   }
 
-  public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata,
-      DateTimeFieldSpec timeColumnFieldSpec) {
+  public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata, 
DateTimeFieldSpec timeColumnFieldSpec) {
     ZKMetadataUtils.updateSegmentZKTimeInterval(segmentZKMetadata, 
timeColumnFieldSpec);
   }
 
@@ -2432,8 +2447,8 @@ public class PinotHelixResourceManager {
   private static Set<String> parseInstanceSet(IdealState idealState, String 
segmentName,
       @Nullable String targetInstance) {
     Set<String> instanceSet = idealState.getInstanceSet(segmentName);
-    Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet),
-        "Could not find segment: %s in ideal state", segmentName);
+    Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), "Could 
not find segment: %s in ideal state",
+        segmentName);
     if (targetInstance != null) {
       return instanceSet.contains(targetInstance) ? 
Collections.singleton(targetInstance) : Collections.emptySet();
     } else {
@@ -2445,11 +2460,9 @@ public class PinotHelixResourceManager {
    * This util is similar to {@link HelixAdmin#resetPartition(String, String, 
String, List)}.
    * However instead of resetting only the ERROR state to its initial state. 
we reset all state regardless.
    */
-  private void resetPartitionAllState(String instanceName, String resourceName,
-      Set<String> resetPartitionNames) {
+  private void resetPartitionAllState(String instanceName, String 
resourceName, Set<String> resetPartitionNames) {
     LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster 
{}.",
-        resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName,
-        instanceName, _helixClusterName);
+        resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName, instanceName, _helixClusterName);
     HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -2470,20 +2483,19 @@ public class PinotHelixResourceManager {
 
     // get current state.
     String sessionId = liveInstance.getEphemeralOwner();
-    CurrentState curState =
-        accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, 
resourceName));
+    CurrentState curState = 
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, 
resourceName));
 
     // check there is no pending messages for the partitions exist
     List<Message> messages = 
accessor.getChildValues(keyBuilder.messages(instanceName), true);
     for (Message message : messages) {
-      if 
(!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
 || !sessionId
-          .equals(message.getTgtSessionId()) || 
!resourceName.equals(message.getResourceName())
+      if 
(!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
 || !sessionId.equals(
+          message.getTgtSessionId()) || 
!resourceName.equals(message.getResourceName())
           || !resetPartitionNames.contains(message.getPartitionName())) {
         continue;
       }
-      throw new RuntimeException(String.format("Can't reset state for %s.%s on 
%s, "
-              + "because a pending message %s exists for resource %s", 
resourceName, resetPartitionNames, instanceName,
-          message.toString(), message.getResourceName()));
+      throw new RuntimeException(
+          String.format("Can't reset state for %s.%s on %s, because a pending 
message %s exists for resource %s",
+              resourceName, resetPartitionNames, instanceName, message, 
message.getResourceName()));
     }
 
     String adminName = null;
@@ -2495,8 +2507,8 @@ public class PinotHelixResourceManager {
       adminName = "UNKNOWN";
     }
 
-    List<Message> resetMessages = new ArrayList<Message>();
-    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
+    List<Message> resetMessages = new ArrayList<>();
+    List<PropertyKey> messageKeys = new ArrayList<>();
     for (String partitionName : resetPartitionNames) {
       // send currentState to initialState message
       String msgId = UUID.randomUUID().toString();
@@ -3113,9 +3125,9 @@ public class PinotHelixResourceManager {
       String instanceAdminEndpoint;
       try {
         instanceAdminEndpoint = _instanceAdminEndpointCache.get(instance);
-      } catch (ExecutionException e) {
+      } catch (Exception e) {
         String errorMessage =
-            String.format("ExecutionException when getting instance admin 
endpoint for instance: %s. Error message: %s",
+            String.format("Caught exception while getting instance admin 
endpoint for instance: %s. Error message: %s",
                 instance, e.getMessage());
         LOGGER.error(errorMessage, e);
         throw new InvalidConfigException(errorMessage);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 6d81f4d064..5e9def8a56 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -39,6 +39,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.lineage.LineageEntryState;
 import org.apache.pinot.common.lineage.SegmentLineage;
@@ -143,24 +144,57 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
   }
 
   @Test
-  public void testGetInstanceEndpoints()
+  public void testGetDataInstanceAdminEndpoints()
       throws Exception {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
-    BiMap<String, String> endpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
 
-    // Check that we have endpoints for all instances.
-    assertEquals(endpoints.size(), NUM_SERVER_INSTANCES);
-
-    // Check actual endpoint names
-    for (Map.Entry<String, String> entry : endpoints.entrySet()) {
+    BiMap<String, String> adminEndpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
+    assertEquals(adminEndpoints.size(), NUM_SERVER_INSTANCES);
+    for (Map.Entry<String, String> entry : adminEndpoints.entrySet()) {
       String key = entry.getKey();
       int port = Server.DEFAULT_ADMIN_API_PORT + 
Integer.parseInt(key.substring("Server_localhost_".length()));
       assertEquals(entry.getValue(), "http://localhost:"; + port);
     }
+
+    // Add a new server
+    String serverName = "Server_localhost_" + NUM_SERVER_INSTANCES;
+    Instance instance = new Instance("localhost", NUM_SERVER_INSTANCES, 
InstanceType.SERVER,
+        Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE), null, 0, 
12345, 0, 0, false);
+    _helixResourceManager.addInstance(instance, false);
+    adminEndpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName));
+    assertEquals(adminEndpoints.size(), 1);
+    assertEquals(adminEndpoints.get(serverName), "http://localhost:12345";);
+
+    // Modify the admin port for the new added server
+    instance = new Instance("localhost", NUM_SERVER_INSTANCES, 
InstanceType.SERVER,
+        Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE), null, 0, 
23456, 0, 0, false);
+    _helixResourceManager.updateInstance(serverName, instance, false);
+    // Admin endpoint is updated through the instance config change callback, 
which happens asynchronously
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        BiMap<String, String> endpoints =
+            
_helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName));
+        assertEquals(endpoints.size(), 1);
+        return endpoints.get(serverName).equals("http://localhost:23456";);
+      } catch (InvalidConfigException e) {
+        throw new RuntimeException(e);
+      }
+    }, 60_000L, "Failed to update the admin port");
+
+    // Remove the new added server
+    _helixResourceManager.dropInstance(serverName);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        
_helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName));
+        return false;
+      } catch (InvalidConfigException e) {
+        return true;
+      }
+    }, 60_000L, "Failed to remove the admin endpoint");
   }
 
   @Test
-  public void testGetInstanceConfigs() {
+  public void testAddRemoveInstance() {
     Set<String> serverInstances = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     for (String instanceName : serverInstances) {
       InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(instanceName);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to