jackjlli commented on a change in pull request #4047: Create leadControllerResource in helix cluster URL: https://github.com/apache/incubator-pinot/pull/4047#discussion_r291009909
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java ########## @@ -53,126 +55,217 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME; + /** * HelixSetupUtils handles how to create or get a helixCluster in controller. * * */ public class HelixSetupUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class); public static synchronized HelixManager setup(String helixClusterName, String zkPath, - String pinotControllerInstanceId, boolean isUpdateStateModel, boolean enableBatchMessageMode) { - + String pinotControllerInstanceId) { try { - createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode); + setupHelixCluster(helixClusterName, zkPath); } catch (final Exception e) { - LOGGER.error("Caught exception", e); + LOGGER.error("Caught exception when setting up Helix cluster: {}", helixClusterName, e); return null; } try { return startHelixControllerInStandadloneMode(helixClusterName, zkPath, pinotControllerInstanceId); } catch (final Exception e) { - LOGGER.error("Caught exception", e); + LOGGER.error("Caught exception when starting helix controller", e); return null; } } - public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel, - boolean enableBatchMessageMode) { + /** + * Set up a brand new Helix cluster if it doesn't exist. + */ + public static void setupHelixCluster(String helixClusterName, String zkPath) { final HelixAdmin admin = new ZKHelixAdmin(zkPath); - final String segmentStateModelName = - PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - if (admin.getClusters().contains(helixClusterName)) { - LOGGER.info("cluster already exists ********************************************* "); - if (isUpdateStateModel) { - final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName); - List<String> states = curStateModelDef.getStatesPriorityList(); - if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) { - LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName); - return; - } else { - LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName); - StateModelDefinition newStateModelDef = - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition(); - ZkClient zkClient = new ZkClient(zkPath); - zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS); - zkClient.setZkSerializer(new ZNRecordSerializer()); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef); - LOGGER.info("Completed updating statemodel {}", segmentStateModelName); - zkClient.close(); - } - } + LOGGER.info("Helix cluster: {} already exists", helixClusterName); return; } - - LOGGER.info("Creating a new cluster, as the helix cluster : " + helixClusterName - + " was not found ********************************************* "); + LOGGER.info("Creating a new Helix cluster: {}", helixClusterName); admin.addCluster(helixClusterName, false); + LOGGER.info("New Cluster: {} created.", helixClusterName); + } + + private static HelixManager startHelixControllerInStandadloneMode(String helixClusterName, String zkUrl, + String pinotControllerInstanceId) { + LOGGER.info("Starting Helix Standalone Controller ... "); + return HelixControllerMain.startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, + HelixControllerMain.STANDALONE); + } + + /** + * Customizes existing Helix cluster to run Pinot components. + */ + public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel, + boolean enableBatchMessageMode) { + final HelixAdmin admin = new ZKHelixAdmin(zkPath); + if (!admin.getClusters().contains(helixClusterName)) { + LOGGER.error("Helix cluster: {} hasn't been set up", helixClusterName); + throw new RuntimeException(); + } + + // Ensure auto join. + ensureAutoJoin(helixClusterName, admin); + + // Add segment state model definition if needed + addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel); + + // Add broker resource online offline state model definition if needed + addBrokerResourceOnlineOfflineStateModelDefinitionIfNeeded(helixClusterName, admin); Review comment: Merged. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org