Jackie-Jiang commented on code in PR #12039:
URL: https://github.com/apache/pinot/pull/12039#discussion_r1432001044


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = 
"pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not 
already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. 
pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = 
clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, 
serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + 
key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider 
standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String 
zkAddress) {
     int zkClientSessionConfig =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new 
ZNRecordSerializer()).setZkServer(zkAddress)
+        
.setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, 
TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is 
tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, 
String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping 
overriding tenant configs", instanceId);
+      return;
+    }
+    InstanceConfig instanceZKConfig = new 
InstanceConfig(instanceConfigZNRecord);
+    Set<String> instanceTags = instanceZKConfig.getTags().stream()
+        .map(PinotConfiguration::relaxPropertyName)
+        .collect(Collectors.toSet());
 
-      Map<String, String> clusterConfigs = 
clusterConfigZNRecord.getSimpleFields();
-      String instanceConfigKeyPrefix =
-          String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, 
serviceRole.name().toLowerCase());
-      for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
-          String instanceConfigKey = instanceConfigKeyPrefix + 
key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
-          addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
-        } else {
-          // TODO: Currently it puts all keys to the instance config. Consider 
standardizing instance config keys and
-          //       only put keys with the instance config key prefix.
-          addConfigIfNotExists(instanceConfig, key, value);
+    for (String key : instanceConfig.getKeys()) {
+      if (key.startsWith(PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = 
key.substring(PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX.length());
+        String tag = instanceConfigKey.substring(0, 
instanceConfigKey.indexOf('.'));
+        String tagKey = instanceConfigKey.substring(tag.length() + 1);
+        if (instanceTags.contains(tag)) {
+          instanceConfig.setProperty(tagKey, instanceConfig.getProperty(key));

Review Comment:
   Do we need the complete key here? Or we just need the part after 
`pinot.<type>`?
   E.g. Do we want `pinot.tag.myTenant_OFFLINE.pinot.server.query.executor.abc` 
or `pinot.tag.myTenant_OFFLINE.query.executor.abc`?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = 
"pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not 
already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. 
pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = 
clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, 
serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + 
key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider 
standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String 
zkAddress) {
     int zkClientSessionConfig =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new 
ZNRecordSerializer()).setZkServer(zkAddress)
+        
.setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, 
TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is 
tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, 
String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping 
overriding tenant configs", instanceId);

Review Comment:
   This is common when the node joins the cluster for the first time. We 
probably want to log info instead



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -135,34 +136,44 @@ public void init(PinotConfiguration brokerConf)
     // Remove all white-spaces from the list of zkServers (if any).
     _zkServers = 
brokerConf.getProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER).replaceAll("\\s+", "");
     _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
-    ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, 
_clusterName, ServiceRole.BROKER);
+    ZkClient zkClient = ServiceStartableUtils.getZKClient(_brokerConf, 
_zkServers);
+    try {
+      ServiceStartableUtils.applyClusterConfig(_brokerConf, zkClient, 
_clusterName, ServiceRole.BROKER);
+      _hostname = brokerConf.getProperty(Broker.CONFIG_OF_BROKER_HOSTNAME);
+      if (_hostname == null) {
+        _hostname =
+            _brokerConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, 
false) ? NetUtils.getHostnameOrAddress()
+                : NetUtils.getHostAddress();
+      }
+
+      if 
(_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+          MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) {
+        
_brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, 
NetUtils.findOpenPort());
+      }
+      _listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf);
+
+      // Override multi-stage query runner hostname if not set explicitly
+      if 
(!_brokerConf.containsKey(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME)) {
+        
_brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, 
_hostname);
+      }
+      _port = _listenerConfigs.get(0).getPort();
+
+      _instanceId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID);
+      if (_instanceId == null) {
+        _instanceId = _brokerConf.getProperty(Helix.Instance.INSTANCE_ID_KEY);
+      }
+      if (_instanceId == null) {
+        _instanceId = Helix.PREFIX_OF_BROKER_INSTANCE + _hostname + "_" + 
_port;
+      }
 
-    if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
-        MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) {
-      _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, 
NetUtils.findOpenPort());
+      ServiceStartableUtils.applyTenantConfigs(_instanceId, zkClient, 
_clusterName, _brokerConf);

Review Comment:
   Since this requires reading the `InstanceConfig` (automatically created when 
the node joins the cluster for the first time) from ZK, we can consider moving 
it into `start()` after the instance is connected to the cluster. At that 
moment, we don't need to create another `ZkClient`.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = 
"pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not 
already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. 
pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = 
clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, 
serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + 
key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider 
standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String 
zkAddress) {
     int zkClientSessionConfig =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new 
ZNRecordSerializer()).setZkServer(zkAddress)
+        
.setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, 
TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is 
tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, 
String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping 
overriding tenant configs", instanceId);
+      return;
+    }
+    InstanceConfig instanceZKConfig = new 
InstanceConfig(instanceConfigZNRecord);
+    Set<String> instanceTags = instanceZKConfig.getTags().stream()
+        .map(PinotConfiguration::relaxPropertyName)

Review Comment:
   Why do we want to relax property name here? This is ZK config, which should 
have the exact name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to