Jackie-Jiang commented on a change in pull request #4047: Create leadControllerResource in helix cluster URL: https://github.com/apache/incubator-pinot/pull/4047#discussion_r291014830
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java ########## @@ -53,126 +55,217 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME; + /** * HelixSetupUtils handles how to create or get a helixCluster in controller. * * */ public class HelixSetupUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class); public static synchronized HelixManager setup(String helixClusterName, String zkPath, - String pinotControllerInstanceId, boolean isUpdateStateModel, boolean enableBatchMessageMode) { - + String pinotControllerInstanceId) { try { - createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode); + setupHelixCluster(helixClusterName, zkPath); } catch (final Exception e) { - LOGGER.error("Caught exception", e); + LOGGER.error("Caught exception when setting up Helix cluster: {}", helixClusterName, e); return null; } try { return startHelixControllerInStandadloneMode(helixClusterName, zkPath, pinotControllerInstanceId); } catch (final Exception e) { - LOGGER.error("Caught exception", e); + LOGGER.error("Caught exception when starting helix controller", e); return null; } } - public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel, - boolean enableBatchMessageMode) { + /** + * Set up a brand new Helix cluster if it doesn't exist. + */ + public static void setupHelixCluster(String helixClusterName, String zkPath) { final HelixAdmin admin = new ZKHelixAdmin(zkPath); - final String segmentStateModelName = - PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - if (admin.getClusters().contains(helixClusterName)) { - LOGGER.info("cluster already exists ********************************************* "); - if (isUpdateStateModel) { - final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName); - List<String> states = curStateModelDef.getStatesPriorityList(); - if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) { - LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName); - return; - } else { - LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName); - StateModelDefinition newStateModelDef = - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition(); - ZkClient zkClient = new ZkClient(zkPath); - zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS); - zkClient.setZkSerializer(new ZNRecordSerializer()); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef); - LOGGER.info("Completed updating statemodel {}", segmentStateModelName); - zkClient.close(); - } - } + LOGGER.info("Helix cluster: {} already exists", helixClusterName); return; } - - LOGGER.info("Creating a new cluster, as the helix cluster : " + helixClusterName - + " was not found ********************************************* "); + LOGGER.info("Creating a new Helix cluster: {}", helixClusterName); admin.addCluster(helixClusterName, false); + LOGGER.info("New Cluster: {} created.", helixClusterName); + } + + private static HelixManager startHelixControllerInStandadloneMode(String helixClusterName, String zkUrl, + String pinotControllerInstanceId) { + LOGGER.info("Starting Helix Standalone Controller ... "); + return HelixControllerMain.startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, + HelixControllerMain.STANDALONE); + } + + /** + * Customizes existing Helix cluster to run Pinot components. + */ + public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel, + boolean enableBatchMessageMode) { + final HelixAdmin admin = new ZKHelixAdmin(zkPath); + if (!admin.getClusters().contains(helixClusterName)) { + LOGGER.error("Helix cluster: {} hasn't been set up", helixClusterName); + throw new RuntimeException(); + } + + // Ensure auto join. + ensureAutoJoin(helixClusterName, admin); + + // Add segment state model definition if needed + addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel); + + // Add broker resource online offline state model definition if needed + addBrokerResourceOnlineOfflineStateModelDefinitionIfNeeded(helixClusterName, admin); - LOGGER.info("Enable auto join."); + // Add broker resource if needed + createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode); + + // Add lead controller resource if needed + createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode); + + // Init property store if needed + initPropertyStoreIfNeeded(helixClusterName, zkPath); + } + + private static void ensureAutoJoin(String helixClusterName, HelixAdmin admin) { final HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build(); + String stateTransitionMaxThreads = MessageType.STATE_TRANSITION + "." + HelixTaskExecutor.MAX_THREADS; + List<String> keys = new ArrayList<>(); + keys.add(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN); + keys.add(stateTransitionMaxThreads); + Map<String, String> configs = admin.getConfig(scope, keys); + if (!Boolean.TRUE.toString().equals(configs.get(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN))) { + configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.TRUE.toString()); + } + if (!Integer.toString(1).equals(configs.get(stateTransitionMaxThreads))) { + configs.put(stateTransitionMaxThreads, String.valueOf(1)); + } + admin.setConfig(scope, configs); + } - final Map<String, String> props = new HashMap<String, String>(); - props.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true)); - //we need only one segment to be loaded at a time - props.put(MessageType.STATE_TRANSITION + "." + HelixTaskExecutor.MAX_THREADS, String.valueOf(1)); - - admin.setConfig(scope, props); - - LOGGER.info( - "Adding state model {} (with CONSUMED state) generated using {} **********************************************", - segmentStateModelName, PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString()); - - // If this is a fresh cluster we are creating, then the cluster will see the CONSUMING state in the - // state model. But then the servers will never be asked to go to that STATE (whether they have the code - // to handle it or not) unil we complete the feature using low-level consumers and turn the feature on. - admin.addStateModelDef(helixClusterName, segmentStateModelName, - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); - - LOGGER.info("Adding state model definition named : " - + PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL - + " generated using : " + PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString() - + " ********************************************** "); - - admin.addStateModelDef(helixClusterName, - PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL, - PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); - - LOGGER.info("Adding empty ideal state for Broker!"); - HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, - helixClusterName, admin); - IdealState idealState = PinotTableIdealStateBuilder - .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode); - admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState); - initPropertyStorePath(helixClusterName, zkPath); - LOGGER.info("New Cluster setup completed... ********************************************** "); + private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath, Review comment: But since LLC is done, we can (and we should) always update state model right? Also, the comments need to be updated. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org