Jackie-Jiang commented on a change in pull request #4047: Create 
leadControllerResource in helix cluster
URL: https://github.com/apache/incubator-pinot/pull/4047#discussion_r290974769
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 ##########
 @@ -53,126 +55,217 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
 
 /**
  * HelixSetupUtils handles how to create or get a helixCluster in controller.
  *
  *
  */
 public class HelixSetupUtils {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixSetupUtils.class);
 
   public static synchronized HelixManager setup(String helixClusterName, 
String zkPath,
-      String pinotControllerInstanceId, boolean isUpdateStateModel, boolean 
enableBatchMessageMode) {
-
+      String pinotControllerInstanceId) {
     try {
-      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, 
enableBatchMessageMode);
+      setupHelixCluster(helixClusterName, zkPath);
     } catch (final Exception e) {
-      LOGGER.error("Caught exception", e);
+      LOGGER.error("Caught exception when setting up Helix cluster: {}", 
helixClusterName, e);
       return null;
     }
 
     try {
       return startHelixControllerInStandadloneMode(helixClusterName, zkPath, 
pinotControllerInstanceId);
     } catch (final Exception e) {
-      LOGGER.error("Caught exception", e);
+      LOGGER.error("Caught exception when starting helix controller", e);
       return null;
     }
   }
 
-  public static void createHelixClusterIfNeeded(String helixClusterName, 
String zkPath, boolean isUpdateStateModel,
-      boolean enableBatchMessageMode) {
+  /**
+   * Set up a brand new Helix cluster if it doesn't exist.
+   */
+  public static void setupHelixCluster(String helixClusterName, String zkPath) 
{
     final HelixAdmin admin = new ZKHelixAdmin(zkPath);
-    final String segmentStateModelName =
-        
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-
     if (admin.getClusters().contains(helixClusterName)) {
-      LOGGER.info("cluster already exists 
********************************************* ");
-      if (isUpdateStateModel) {
-        final StateModelDefinition curStateModelDef = 
admin.getStateModelDef(helixClusterName, segmentStateModelName);
-        List<String> states = curStateModelDef.getStatesPriorityList();
-        if 
(states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE))
 {
-          LOGGER.info("State model {} already updated to contain CONSUMING 
state", segmentStateModelName);
-          return;
-        } else {
-          LOGGER.info("Updating {} to add states for low level consumers", 
segmentStateModelName);
-          StateModelDefinition newStateModelDef =
-              
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
-          ZkClient zkClient = new ZkClient(zkPath);
-          
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC,
 TimeUnit.SECONDS);
-          zkClient.setZkSerializer(new ZNRecordSerializer());
-          HelixDataAccessor accessor =
-              new ZKHelixDataAccessor(helixClusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
-          PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-          
accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), 
newStateModelDef);
-          LOGGER.info("Completed updating statemodel {}", 
segmentStateModelName);
-          zkClient.close();
-        }
-      }
+      LOGGER.info("Helix cluster: {} already exists", helixClusterName);
       return;
     }
-
-    LOGGER.info("Creating a new cluster, as the helix cluster : " + 
helixClusterName
-        + " was not found ********************************************* ");
+    LOGGER.info("Creating a new Helix cluster: {}", helixClusterName);
     admin.addCluster(helixClusterName, false);
+    LOGGER.info("New Cluster: {} created.", helixClusterName);
+  }
+
+  private static HelixManager startHelixControllerInStandadloneMode(String 
helixClusterName, String zkUrl,
+      String pinotControllerInstanceId) {
+    LOGGER.info("Starting Helix Standalone Controller ... ");
+    return HelixControllerMain.startHelixController(zkUrl, helixClusterName, 
pinotControllerInstanceId,
+        HelixControllerMain.STANDALONE);
+  }
+
+  /**
+   * Customizes existing Helix cluster to run Pinot components.
+   */
+  public static void setupPinotCluster(String helixClusterName, String zkPath, 
boolean isUpdateStateModel,
+      boolean enableBatchMessageMode) {
+    final HelixAdmin admin = new ZKHelixAdmin(zkPath);
+    if (!admin.getClusters().contains(helixClusterName)) {
+      LOGGER.error("Helix cluster: {} hasn't been set up", helixClusterName);
+      throw new RuntimeException();
+    }
+
+    // Ensure auto join.
+    ensureAutoJoin(helixClusterName, admin);
+
+    // Add segment state model definition if needed
+    addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, 
isUpdateStateModel);
+
+    // Add broker resource online offline state model definition if needed
+    
addBrokerResourceOnlineOfflineStateModelDefinitionIfNeeded(helixClusterName, 
admin);
 
-    LOGGER.info("Enable auto join.");
+    // Add broker resource if needed
+    createBrokerResourceIfNeeded(helixClusterName, admin, 
enableBatchMessageMode);
+
+    // Add lead controller resource if needed
+    createLeadControllerResourceIfNeeded(helixClusterName, admin, 
enableBatchMessageMode);
+
+    // Init property store if needed
+    initPropertyStoreIfNeeded(helixClusterName, zkPath);
+  }
+
+  private static void ensureAutoJoin(String helixClusterName, HelixAdmin 
admin) {
     final HelixConfigScope scope =
         new 
HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
+    String stateTransitionMaxThreads = MessageType.STATE_TRANSITION + "." + 
HelixTaskExecutor.MAX_THREADS;
+    List<String> keys = new ArrayList<>();
+    keys.add(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN);
+    keys.add(stateTransitionMaxThreads);
+    Map<String, String> configs = admin.getConfig(scope, keys);
+    if 
(!Boolean.TRUE.toString().equals(configs.get(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)))
 {
+      configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
Boolean.TRUE.toString());
+    }
+    if (!Integer.toString(1).equals(configs.get(stateTransitionMaxThreads))) {
+      configs.put(stateTransitionMaxThreads, String.valueOf(1));
+    }
+    admin.setConfig(scope, configs);
+  }
 
-    final Map<String, String> props = new HashMap<String, String>();
-    props.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
String.valueOf(true));
-    //we need only one segment to be loaded at a time
-    props.put(MessageType.STATE_TRANSITION + "." + 
HelixTaskExecutor.MAX_THREADS, String.valueOf(1));
-
-    admin.setConfig(scope, props);
-
-    LOGGER.info(
-        "Adding state model {} (with CONSUMED state) generated using {} 
**********************************************",
-        segmentStateModelName, 
PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-
-    // If this is a fresh cluster we are creating, then the cluster will see 
the CONSUMING state in the
-    // state model. But then the servers will never be asked to go to that 
STATE (whether they have the code
-    // to handle it or not) unil we complete the feature using low-level 
consumers and turn the feature on.
-    admin.addStateModelDef(helixClusterName, segmentStateModelName,
-        
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
-    LOGGER.info("Adding state model definition named : "
-        + 
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL
-        + " generated using : " + 
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()
-        + " ********************************************** ");
-
-    admin.addStateModelDef(helixClusterName,
-        
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
-        
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
-    LOGGER.info("Adding empty ideal state for Broker!");
-    HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
-        helixClusterName, admin);
-    IdealState idealState = PinotTableIdealStateBuilder
-        .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, 
enableBatchMessageMode);
-    admin.setResourceIdealState(helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
-    initPropertyStorePath(helixClusterName, zkPath);
-    LOGGER.info("New Cluster setup completed... 
********************************************** ");
+  private static void addSegmentStateModelDefinitionIfNeeded(String 
helixClusterName, HelixAdmin admin, String zkPath,
 
 Review comment:
   Most of the part for this method is for LLC migration. Since it's done, 
remove the redundant part (only keep line 156-168, and remove redundant 
arguments)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to