klsince commented on code in PR #12039: URL: https://github.com/apache/pinot/pull/12039#discussion_r1409728866
########## pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java: ########## @@ -36,56 +40,91 @@ private ServiceStartableUtils() { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class); private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s"; + private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s"; private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all."; + private static final String PINOT_TENANT_LEVEL_CONFIG_KEY_PREFIX = "pinot.tenant."; private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s."; + public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName, + ServiceRole serviceRole) { + ZkClient zkClient = getZKClient(instanceConfig, zkAddress); + try { + applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole); + } finally { + zkClient.close(); + } + } + /** * Applies the ZK cluster config to the given instance config if it does not already exist. * * In the ZK cluster config: * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers */ - public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName, + public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName, ServiceRole serviceRole) { + ZNRecord clusterConfigZNRecord = + zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true); + if (clusterConfigZNRecord == null) { + LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName); + return; + } + + Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields(); + String instanceConfigKeyPrefix = + String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase()); + for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) { + String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length()); + addConfigIfNotExists(instanceConfig, instanceConfigKey, value); + } else { + // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and + // only put keys with the instance config key prefix. + addConfigIfNotExists(instanceConfig, key, value); + } + } + } + + public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) { int zkClientSessionConfig = instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG, CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS); int zkClientConnectionTimeoutMs = instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG, CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS); - ZkClient zkClient = new ZkClient.Builder() - .setZkSerializer(new ZNRecordSerializer()) - .setZkServer(zkAddress) - .setConnectionTimeout(zkClientConnectionTimeoutMs) - .setSessionTimeout(zkClientSessionConfig) - .build(); + ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress) + .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build(); zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS); + return zkClient; + } - try { - ZNRecord clusterConfigZNRecord = - zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true); - if (clusterConfigZNRecord == null) { - LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName); - return; - } + /** + * Overrides the instance config with the tenant configs if the tenant is tagged on the instance. + */ + public static void overrideTenantConfigs(String instanceId, ZkClient zkClient, String clusterName, Review Comment: call this `applyTenantConfig` to be a bit more consistent with the `applyClusterConfig`, and can extract the logic of getting `tenantsRelaxedNames` to a helper method to simplify `applyTenantConfig` method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org