klsince commented on code in PR #12039:
URL: https://github.com/apache/pinot/pull/12039#discussion_r1409728866


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +40,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = 
"/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TENANT_LEVEL_CONFIG_KEY_PREFIX = 
"pinot.tenant.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = 
"pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not 
already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. 
pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, 
ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = 
clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, 
serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + 
key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider 
standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String 
zkAddress) {
     int zkClientSessionConfig =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new 
ZNRecordSerializer()).setZkServer(zkAddress)
+        
.setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, 
TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, 
clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping 
applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is 
tagged on the instance.
+   */
+  public static void overrideTenantConfigs(String instanceId, ZkClient 
zkClient, String clusterName,

Review Comment:
   call this `applyTenantConfig` to be a bit more consistent with the 
`applyClusterConfig`, and can extract the logic of getting 
`tenantsRelaxedNames` to a helper method to simplify `applyTenantConfig` method
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to