This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ce91bddd [Query Resource Isolation] Workload Configs (#15109)
26ce91bddd is described below

commit 26ce91bdddfde13cc78fb960477f56f21662d2a1
Author: Praveen <praveenkchagan...@gmail.com>
AuthorDate: Fri Jun 13 09:28:19 2025 -0700

    [Query Resource Isolation] Workload Configs (#15109)
    
    * Workload Configs
    
    * workload config
    
    * Add API
    
    * config
    
    * Change config structure
    
    * Propagation strategy
    
    * Fix style check
    
    * Cost spliting on update
    
    * Table addition propagation
    
    * perf
    
    * Tests
    
    * test
    
    * test 2
    
    * Review comments 1
    
    * review comments 3
    
    * review comments 3
    
    * name change
    
    * review comments 4
---
 .../BrokerUserDefinedMessageHandlerFactory.java    |  31 ++
 .../messages/QueryWorkloadRefreshMessage.java      |  69 +++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |  57 ++++
 .../utils/config/QueryWorkloadConfigUtils.java     | 234 +++++++++++++++
 .../utils/config/QueryWorkloadConfigUtilsTest.java | 199 ++++++++++++
 .../pinot/controller/api/resources/Constants.java  |   1 +
 .../PinotQueryWorkloadRestletResource.java         | 332 +++++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  61 +++-
 .../controller/workload/QueryWorkloadManager.java  | 203 +++++++++++++
 .../workload/scheme/DefaultPropagationScheme.java  |  51 ++++
 .../workload/scheme/PropagationScheme.java         |  36 +++
 .../workload/scheme/PropagationSchemeProvider.java |  42 +++
 .../workload/scheme/PropagationUtils.java          | 211 +++++++++++++
 .../workload/scheme/TablePropagationScheme.java    |  78 +++++
 .../workload/scheme/TenantPropagationScheme.java   |  72 +++++
 .../controller/workload/splitter/CostSplitter.java |  48 +++
 .../workload/splitter/DefaultCostSplitter.java     |  51 ++++
 .../controller/workload/PropagationUtilsTest.java  | 214 +++++++++++++
 .../java/org/apache/pinot/core/auth/Actions.java   |   4 +
 .../server/starter/helix/BaseServerStarter.java    |   6 +
 .../helix/QueryWorkloadMessageHandlerFactory.java  |  87 ++++++
 .../spi/config/workload/EnforcementProfile.java    |  78 +++++
 .../pinot/spi/config/workload/InstanceCost.java    |  79 +++++
 .../pinot/spi/config/workload/NodeConfig.java      | 126 ++++++++
 .../spi/config/workload/PropagationScheme.java     | 158 ++++++++++
 .../spi/config/workload/QueryWorkloadConfig.java   | 109 +++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 27 files changed, 2637 insertions(+), 1 deletion(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 81ea3d0d4f..1b2e7ed045 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -28,10 +28,12 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
 import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
 import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
 import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +73,9 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
         return new RefreshDatabaseConfigMessageHandler(new 
DatabaseConfigRefreshMessage(message), context);
       case ApplicationQpsQuotaRefreshMessage.REFRESH_APP_QUOTA_MSG_SUB_TYPE:
         return new RefreshApplicationQpsQuotaMessageHandler(new 
ApplicationQpsQuotaRefreshMessage(message), context);
+      case QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE:
+      case QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE:
+        return new QueryWorkloadRefreshMessageHandler(new 
QueryWorkloadRefreshMessage(message), context);
       default:
         // NOTE: Log a warning and return no-op message handler for 
unsupported message sub-types. This can happen when
         //       a new message sub-type is added, and the sender gets deployed 
first while receiver is still running the
@@ -259,4 +264,30 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
       LOGGER.error("Got error for no-op message handling (error code: {}, 
error type: {})", code, type, e);
     }
   }
+
+  private static class QueryWorkloadRefreshMessageHandler extends 
MessageHandler {
+    final String _queryWorkloadName;
+    final InstanceCost _instanceCost;
+
+    QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage 
queryWorkloadRefreshMessage,
+        NotificationContext context) {
+      super(queryWorkloadRefreshMessage, context);
+      _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
+      _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      // TODO: Add logic to invoke the query workload manager to 
refresh/delete the query workload config
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{
+      LOGGER.error("Got error while refreshing query workload config for query 
workload: {} (error code: {},"
+          + " error type: {})", _queryWorkloadName, errorCode, errorType, e);
+    }
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
new file mode 100644
index 0000000000..85f4da123e
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+
+
+/**
+ * Message to refresh the query workload on the instances.
+ * This message include the host level cost for each instance.
+ */
+public class QueryWorkloadRefreshMessage extends Message {
+  public static final String REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE = 
"REFRESH_QUERY_WORKLOAD";
+  public static final String DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE = 
"DELETE_QUERY_WORKLOAD";
+  public static final String QUERY_WORKLOAD_NAME = "queryWorkloadName";
+  public static final String INSTANCE_COST = "instanceCost";
+
+  /**
+   * Constructor for the sender.
+   */
+  public QueryWorkloadRefreshMessage(String queryWorkloadName, String 
messageSubType,
+                                     @Nullable InstanceCost instanceCost) {
+    super(Message.MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(messageSubType);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+    QueryWorkloadConfigUtils.updateZNRecordWithInstanceCost(getRecord(), 
queryWorkloadName, instanceCost);
+  }
+
+  /**
+   * Constructor for the receiver.
+   */
+  public QueryWorkloadRefreshMessage(Message message) {
+    super(message.getRecord());
+    if (!message.getMsgSubType().equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
+        || 
!message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
+      throw new IllegalArgumentException("Unknown message subtype:" + 
message.getMsgSubType());
+    }
+  }
+
+  public String getQueryWorkloadName() {
+    return getRecord().getSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME);
+  }
+
+  public InstanceCost getInstanceCost() {
+    return QueryWorkloadConfigUtils.getInstanceCostFromZNRecord(getRecord());
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index dfb2bbe401..cfb4ad4546 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -43,12 +43,14 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.LogicalTableConfigUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
+import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.ConfigUtils;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -82,6 +84,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = 
"/CONFIGS/CLUSTER";
   private static final String PROPERTYSTORE_SEGMENT_LINEAGE = 
"/SEGMENT_LINEAGE";
   private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = 
"/MINION_TASK_METADATA";
+  private static final String PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX = 
"/CONFIGS/QUERYWORKLOAD";
 
   public static void setUserConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String username, ZNRecord znRecord) {
     propertyStore.set(constructPropertyStorePathForUserConfig(username), 
znRecord, AccessOption.PERSISTENT);
@@ -305,6 +308,14 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
tableNameWithType);
   }
 
+  public static String getPropertyStoreWorkloadConfigsPrefix() {
+    return PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX;
+  }
+
+  public static String constructPropertyStorePathForQueryWorkloadConfig(String 
workloadName) {
+    return StringUtil.join("/", PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX, 
workloadName);
+  }
+
   @Deprecated
   public static String 
constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
       String tableNameWithType) {
@@ -837,6 +848,52 @@ public class ZKMetadataProvider {
     }
   }
 
+  public static List<QueryWorkloadConfig> 
getAllQueryWorkloadConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    List<ZNRecord> znRecords =
+        propertyStore.getChildren(getPropertyStoreWorkloadConfigsPrefix(), 
null, AccessOption.PERSISTENT,
+            CommonConstants.Helix.ZkClient.RETRY_COUNT, 
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+    if (znRecords == null) {
+      return Collections.emptyList();
+    }
+    int numZNRecords = znRecords.size();
+    List<QueryWorkloadConfig> queryWorkloadConfigs = new 
ArrayList<>(numZNRecords);
+    for (ZNRecord znRecord : znRecords) {
+      
queryWorkloadConfigs.add(QueryWorkloadConfigUtils.fromZNRecord(znRecord));
+    }
+    return queryWorkloadConfigs;
+  }
+
+  @Nullable
+  public static QueryWorkloadConfig 
getQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String workloadName) {
+    ZNRecord znRecord = 
propertyStore.get(constructPropertyStorePathForQueryWorkloadConfig(workloadName),
+        null, AccessOption.PERSISTENT);
+    if (znRecord == null) {
+      return null;
+    }
+    return QueryWorkloadConfigUtils.fromZNRecord(znRecord);
+  }
+
+  public static boolean setQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
+      QueryWorkloadConfig queryWorkloadConfig) {
+    String path = 
constructPropertyStorePathForQueryWorkloadConfig(queryWorkloadConfig.getQueryWorkloadName());
+    boolean isNewConfig = !propertyStore.exists(path, AccessOption.PERSISTENT);
+    ZNRecord znRecord = isNewConfig ? new 
ZNRecord(queryWorkloadConfig.getQueryWorkloadName())
+        : propertyStore.get(path, null, AccessOption.PERSISTENT);
+    // Update the record with new workload configuration
+    QueryWorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord, 
queryWorkloadConfig);
+    // Create or update based on existence
+    return isNewConfig ? propertyStore.create(path, znRecord, 
AccessOption.PERSISTENT)
+        : propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
+  }
+
+  public static void deleteQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String workloadName) {
+    String propertyStorePath = 
constructPropertyStorePathForQueryWorkloadConfig(workloadName);
+    if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
+      propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
+    }
+  }
+
   public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
       LogicalTableConfig logicalTableConfig) {
     try {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java
new file mode 100644
index 0000000000..034e4724f0
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.config;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.common.utils.http.HttpClientConfig;
+import org.apache.pinot.common.utils.tls.TlsUtils;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+
+
+public class QueryWorkloadConfigUtils {
+  private QueryWorkloadConfigUtils() {
+  }
+
+  private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(QueryWorkloadConfigUtils.class);
+  private static final HttpClient HTTP_CLIENT = new 
HttpClient(HttpClientConfig.DEFAULT_HTTP_CLIENT_CONFIG,
+          TlsUtils.getSslContext());
+
+  /**
+   * Converts a ZNRecord into a QueryWorkloadConfig object by extracting 
mapFields.
+   *
+   * @param znRecord The ZNRecord containing workload config data.
+   * @return A QueryWorkloadConfig object.
+   */
+  public static QueryWorkloadConfig fromZNRecord(ZNRecord znRecord) {
+    Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
+    String queryWorkloadName = 
znRecord.getSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME);
+    Preconditions.checkNotNull(queryWorkloadName, "queryWorkloadName cannot be 
null");
+    String nodeConfigsJson = 
znRecord.getSimpleField(QueryWorkloadConfig.NODE_CONFIGS);
+    Preconditions.checkNotNull(nodeConfigsJson, "nodeConfigs cannot be null");
+    try {
+      List<NodeConfig> nodeConfigs = JsonUtils.stringToObject(nodeConfigsJson, 
new TypeReference<>() { });
+      return new QueryWorkloadConfig(queryWorkloadName, nodeConfigs);
+    } catch (Exception e) {
+      String errorMessage = String.format("Failed to convert ZNRecord : %s to 
QueryWorkloadConfig", znRecord);
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  /**
+   * Updates a ZNRecord with the fields from a WorkloadConfig object.
+   *
+   * @param queryWorkloadConfig The QueryWorkloadConfig object to convert.
+   * @param znRecord The ZNRecord to update.
+   */
+  public static void updateZNRecordWithWorkloadConfig(ZNRecord znRecord, 
QueryWorkloadConfig queryWorkloadConfig) {
+    znRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
queryWorkloadConfig.getQueryWorkloadName());
+    try {
+      znRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
+          JsonUtils.objectToString(queryWorkloadConfig.getNodeConfigs()));
+    } catch (Exception e) {
+      String errorMessage = String.format("Failed to convert 
QueryWorkloadConfig : %s to ZNRecord",
+          queryWorkloadConfig);
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  public static void updateZNRecordWithInstanceCost(ZNRecord znRecord, String 
queryWorkloadName,
+      InstanceCost instanceCost) {
+    Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
+    Preconditions.checkNotNull(instanceCost, "InstanceCost cannot be null");
+    try {
+      znRecord.setSimpleField(QueryWorkloadRefreshMessage.QUERY_WORKLOAD_NAME, 
queryWorkloadName);
+      znRecord.setSimpleField(QueryWorkloadRefreshMessage.INSTANCE_COST, 
JsonUtils.objectToString(instanceCost));
+    } catch (Exception e) {
+      String errorMessage = String.format("Failed to convert InstanceCost : %s 
to ZNRecord",
+          instanceCost);
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  public static InstanceCost getInstanceCostFromZNRecord(ZNRecord znRecord) {
+    Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
+    String instanceCostJson = 
znRecord.getSimpleField(QueryWorkloadRefreshMessage.INSTANCE_COST);
+    Preconditions.checkNotNull(instanceCostJson, "InstanceCost cannot be 
null");
+    try {
+      return JsonUtils.stringToObject(instanceCostJson, InstanceCost.class);
+    } catch (Exception e) {
+      String errorMessage = String.format("Failed to convert ZNRecord : %s to 
InstanceCost", znRecord);
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+  /**
+   * Fetches query workload configs for a specific instance from the 
controller.
+   *
+   * @param controllerUrl The URL of the controller.
+   * @param instanceId The ID of the instance to fetch configs for.
+   * @param nodeType The type of node (e.g., BROKER, SERVER).
+   * @return A map of workload names to their corresponding InstanceCost 
objects.
+   */
+  public static Map<String, InstanceCost> 
getQueryWorkloadConfigsFromController(String controllerUrl, String instanceId,
+                                                                               
 NodeConfig.Type nodeType) {
+    try {
+      if (controllerUrl == null || controllerUrl.isEmpty()) {
+        LOGGER.warn("Controller URL is empty, cannot fetch query workload 
configs for instance: {}", instanceId);
+        return Collections.emptyMap();
+      }
+      URI queryWorkloadURI = new URI(controllerUrl + 
"/queryWorkloadConfigs/instance/" + instanceId + "?nodeType="
+              + nodeType);
+      ClassicHttpRequest request = ClassicRequestBuilder.get(queryWorkloadURI)
+              .setVersion(HttpVersion.HTTP_1_1)
+              .setHeader(HttpHeaders.CONTENT_TYPE, 
HttpClient.JSON_CONTENT_TYPE)
+              .build();
+      AtomicReference<Map<String, InstanceCost>> workloadToInstanceCost = new 
AtomicReference<>(null);
+      RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(3, 
3000L, 1.2f);
+      retryPolicy.attempt(() -> {
+        try {
+          SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
+                  HTTP_CLIENT.sendRequest(request, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS)
+          );
+          if (response.getStatusCode() == HttpStatus.SC_OK) {
+            
workloadToInstanceCost.set(JsonUtils.stringToObject(response.getResponse(), new 
TypeReference<>() { }));
+            LOGGER.info("Successfully fetched query workload configs from 
controller: {}, Instance: {}",
+                    controllerUrl, instanceId);
+            return true;
+          } else if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+            LOGGER.info("No query workload configs found for controller: {}, 
Instance: {}", controllerUrl, instanceId);
+            workloadToInstanceCost.set(Collections.emptyMap());
+            return true;
+          } else {
+            LOGGER.warn("Failed to fetch query workload configs from 
controller: {}, Instance: {}, Status: {}",
+                    controllerUrl, instanceId, response.getStatusCode());
+            return false;
+          }
+        } catch (Exception e) {
+          LOGGER.warn("Failed to fetch query workload configs from controller: 
{}, Instance: {}",
+                  controllerUrl, instanceId, e);
+          return false;
+        }
+      });
+      return workloadToInstanceCost.get();
+    } catch (Exception e) {
+      LOGGER.warn("Failed to fetch query workload configs from controller: {}, 
Instance: {}",
+              controllerUrl, instanceId, e);
+      return Collections.emptyMap();
+    }
+  }
+
+  /**
+   * Validates the given QueryWorkloadConfig and returns a list of validation 
error messages.
+   *
+   * @param config the QueryWorkloadConfig to validate
+   * @return a list of validation errors; empty if config is valid
+   */
+  public static List<String> validateQueryWorkloadConfig(QueryWorkloadConfig 
config) {
+    List<String> errors = new ArrayList<>();
+    if (config == null) {
+      errors.add("QueryWorkloadConfig cannot be null");
+      return errors;
+    }
+    String name = config.getQueryWorkloadName();
+    if (name == null || name.trim().isEmpty()) {
+      errors.add("queryWorkloadName cannot be null or empty");
+    }
+    List<NodeConfig> nodeConfigs = config.getNodeConfigs();
+    if (nodeConfigs == null || nodeConfigs.isEmpty()) {
+      errors.add("nodeConfigs cannot be null or empty");
+    } else {
+      for (int i = 0; i < nodeConfigs.size(); i++) {
+        NodeConfig nodeConfig = nodeConfigs.get(i);
+        String prefix = "nodeConfigs[" + i + "]";
+        if (nodeConfig == null) {
+          errors.add(prefix + " cannot be null");
+          continue;
+        }
+        if (nodeConfig.getNodeType() == null) {
+          errors.add(prefix + ".type cannot be null");
+        }
+        // Validate EnforcementProfile
+        EnforcementProfile enforcementProfile = 
nodeConfig.getEnforcementProfile();
+        if (enforcementProfile == null) {
+          errors.add(prefix + "enforcementProfile cannot be null");
+        } else {
+           if (enforcementProfile.getCpuCostNs() < 0) {
+             errors.add(prefix + ".enforcementProfile.cpuCostNs cannot be 
negative");
+           }
+           if (enforcementProfile.getMemoryCostBytes() < 0) {
+               errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot 
be negative");
+           }
+        }
+        // Validate PropagationScheme
+        PropagationScheme propagationScheme = 
nodeConfig.getPropagationScheme();
+        if (propagationScheme == null) {
+          errors.add(prefix + ".propagationScheme cannot be null");
+        } else {
+          if (propagationScheme.getPropagationType() == null) {
+            errors.add(prefix + ".propagationScheme.type cannot be null");
+          }
+        }
+      }
+    }
+    return errors;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java
new file mode 100644
index 0000000000..0bbe84b7fe
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.config;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class QueryWorkloadConfigUtilsTest {
+
+  @Test(dataProvider = "fromZNRecordDataProvider")
+  public void testFromZNRecord(ZNRecord znRecord, QueryWorkloadConfig 
expectedQueryWorkloadConfig, boolean shouldFail) {
+    try {
+      QueryWorkloadConfig actualQueryWorkloadConfig = 
QueryWorkloadConfigUtils.fromZNRecord(znRecord);
+      if (shouldFail) {
+        Assert.fail("Expected an exception but none was thrown");
+      }
+      Assert.assertEquals(actualQueryWorkloadConfig, 
expectedQueryWorkloadConfig);
+    } catch (Exception e) {
+      if (!shouldFail) {
+        Assert.fail("Caught unexpected exception: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  @DataProvider(name = "fromZNRecordDataProvider")
+  public Object[][] fromZNRecordDataProvider() throws JsonProcessingException {
+    List<Object[]> data = new ArrayList<>();
+
+    // Shared, valid configuration
+    EnforcementProfile validEnforcementProfile = new EnforcementProfile(100, 
100);
+
+    // Server node
+    PropagationScheme serverPropagationScheme = new 
PropagationScheme(PropagationScheme.Type.TABLE,
+        List.of("value1", "value2"));
+    NodeConfig serverNodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE, 
validEnforcementProfile,
+        serverPropagationScheme);
+
+    // Broker node
+    PropagationScheme brokerPropagationScheme = new 
PropagationScheme(PropagationScheme.Type.TENANT,
+        List.of("value3", "value4"));
+    NodeConfig brokerNodeConfig = new NodeConfig(NodeConfig.Type.BROKER_NODE, 
validEnforcementProfile,
+        brokerPropagationScheme);
+
+    List<NodeConfig> nodeConfigs = List.of(serverNodeConfig, brokerNodeConfig);
+    QueryWorkloadConfig validQueryWorkloadConfig = new 
QueryWorkloadConfig("workloadId", nodeConfigs);
+
+    // Valid scenario: NODE_CONFIGS field is a JSON array string
+    ZNRecord validZnRecord = new ZNRecord("workloadId");
+    validZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    validZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
JsonUtils.objectToString(nodeConfigs));
+    data.add(new Object[] { validZnRecord, validQueryWorkloadConfig, false });
+
+    // Null propagation scheme
+    NodeConfig nodeConfigWithoutPropagationScheme = new 
NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile,
+            null);
+    List<NodeConfig> nodeConfigsWithoutPropagation = 
List.of(nodeConfigWithoutPropagationScheme);
+    ZNRecord znRecordNullPropagation = new ZNRecord("workloadId");
+    
znRecordNullPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    znRecordNullPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
+        JsonUtils.objectToString(nodeConfigsWithoutPropagation));
+    QueryWorkloadConfig expectedQueryWorkloadConfigNullPropagation = new 
QueryWorkloadConfig("workloadId",
+        nodeConfigsWithoutPropagation);
+    data.add(new Object[] { znRecordNullPropagation, 
expectedQueryWorkloadConfigNullPropagation, false });
+
+    // Missing NODE_CONFIGS field
+    ZNRecord missingNodeConfigsZnRecord = new ZNRecord("workloadId");
+    
missingNodeConfigsZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
 "workloadId");
+    data.add(new Object[] { missingNodeConfigsZnRecord, null, true });
+
+    // Invalid JSON in NODE_CONFIGS field
+    ZNRecord invalidJsonZnRecord = new ZNRecord("workloadId");
+    
invalidJsonZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    invalidJsonZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
"{invalidJsonField: }");
+    data.add(new Object[] { invalidJsonZnRecord, null, true });
+
+    return data.toArray(new Object[0][]);
+  }
+
+  @Test(dataProvider = "updateZNRecordDataProvider")
+  public void testUpdateZNRecordWithWorkloadConfig(QueryWorkloadConfig 
queryWorkloadConfig, ZNRecord znRecord,
+      ZNRecord expectedZnRecord, boolean shouldFail) {
+    try {
+      QueryWorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord, 
queryWorkloadConfig);
+      if (shouldFail) {
+        Assert.fail("Expected an exception but none was thrown");
+      }
+      Assert.assertEquals(znRecord, expectedZnRecord);
+    } catch (Exception e) {
+      if (!shouldFail) {
+        Assert.fail("Caught unexpected exception: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  @DataProvider(name = "updateZNRecordDataProvider")
+  public Object[][] updateZNRecordDataProvider() throws 
JsonProcessingException {
+    List<Object[]> data = new ArrayList<>();
+
+    EnforcementProfile validEnforcementProfile = new EnforcementProfile(100, 
100);
+    // Server scheme
+    PropagationScheme serverPropagationScheme = new 
PropagationScheme(PropagationScheme.Type.TABLE,
+        List.of("value1", "value2"));
+    NodeConfig serverNodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE, 
validEnforcementProfile,
+        serverPropagationScheme);
+    // Broker scheme
+    PropagationScheme brokerPropagationScheme = new 
PropagationScheme(PropagationScheme.Type.TENANT,
+        List.of("value3", "value4"));
+    NodeConfig brokerNodeConfig = new NodeConfig(NodeConfig.Type.BROKER_NODE, 
validEnforcementProfile,
+        brokerPropagationScheme);
+    List<NodeConfig> nodeConfigs = List.of(serverNodeConfig, brokerNodeConfig);
+    QueryWorkloadConfig validQueryWorkloadConfig = new 
QueryWorkloadConfig("workloadId", nodeConfigs);
+
+    // 1) Valid scenario
+    ZNRecord validZnRecord = new ZNRecord("validId");
+    ZNRecord expectedValidZnRecord = new ZNRecord("validId");
+    validZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    
expectedValidZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    String nodeConfigsJson = JsonUtils.objectToString(nodeConfigs);
+    validZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
nodeConfigsJson);
+    expectedValidZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
nodeConfigsJson);
+    data.add(new Object[] { validQueryWorkloadConfig, validZnRecord, 
expectedValidZnRecord, false });
+
+    // 2) Null propagation scheme in both nodes
+    NodeConfig nodeConfigWithoutPropagation = new 
NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile,
+        null);
+    List<NodeConfig> nodeConfigsWithoutPropagation = 
List.of(nodeConfigWithoutPropagation);
+    QueryWorkloadConfig configWithoutPropagation = new 
QueryWorkloadConfig("noPropagation",
+        nodeConfigsWithoutPropagation);
+
+    String nodeConfigsNoPropagationJson = 
JsonUtils.objectToString(nodeConfigsWithoutPropagation);
+
+    ZNRecord znRecordNoPropagation = new ZNRecord("noPropagationId");
+    ZNRecord expectedZnRecordNoPropagation = new ZNRecord("noPropagationId");
+    
znRecordNoPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"noPropagation");
+    znRecordNoPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
nodeConfigsNoPropagationJson);
+
+    
expectedZnRecordNoPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
 "noPropagation");
+    
expectedZnRecordNoPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
nodeConfigsNoPropagationJson);
+    data.add(new Object[] { configWithoutPropagation, znRecordNoPropagation, 
expectedZnRecordNoPropagation, false });
+
+    // 3) Null server node in QueryWorkloadConfig
+    List<NodeConfig> nodeConfigsWithNullServerNode = List.of(brokerNodeConfig);
+    QueryWorkloadConfig nullServerNodeConfig = new 
QueryWorkloadConfig("nullServer", nodeConfigsWithNullServerNode);
+    ZNRecord znRecordNullServer = new ZNRecord("nullServerId");
+    ZNRecord expectedZnRecordNullServer = new ZNRecord("nullServerId");
+    znRecordNullServer.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"nullServer");
+    
expectedZnRecordNullServer.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
 "nullServer");
+    String nodeConfigsWithNullServerJson = 
JsonUtils.objectToString(nodeConfigsWithNullServerNode);
+    znRecordNullServer.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
nodeConfigsWithNullServerJson);
+    
expectedZnRecordNullServer.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
nodeConfigsWithNullServerJson);
+    data.add(new Object[] { nullServerNodeConfig, znRecordNullServer, 
expectedZnRecordNullServer, false });
+
+    // 4) Null QueryWorkloadConfig -> should fail
+    ZNRecord znRecordNullConfig = new ZNRecord("nullConfigId");
+    data.add(new Object[] { null, znRecordNullConfig, null, true });
+
+    // 5) Null ZNRecord -> should fail
+    data.add(new Object[] { validQueryWorkloadConfig, null, null, true });
+
+    // 6) Behavior with empty ZNRecord ID
+    ZNRecord emptyIdZnRecord = new ZNRecord("");
+    ZNRecord expectedEmptyIdZnRecord = new ZNRecord("");
+    emptyIdZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    
expectedEmptyIdZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, 
"workloadId");
+    String emptyNodeConfigsJson = JsonUtils.objectToString(nodeConfigs);
+    emptyIdZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
emptyNodeConfigsJson);
+    expectedEmptyIdZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, 
emptyNodeConfigsJson);
+    data.add(new Object[] { validQueryWorkloadConfig, emptyIdZnRecord, 
expectedEmptyIdZnRecord, false });
+
+    return data.toArray(new Object[0][]);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index fea05fc8b2..06305deb4c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -53,6 +53,7 @@ public class Constants {
   public static final String APP_CONFIGS = "AppConfigs";
   public static final String PERIODIC_TASK_TAG = "PeriodicTask";
   public static final String UPSERT_RESOURCE_TAG = "Upsert";
+  public static final String QUERY_WORKLOAD_TAG = "QueryWorkload";
 
   public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = 
"RealtimeSegmentValidationManager";
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java
new file mode 100644
index 0000000000..35bdf6f9a2
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+@Api(tags = Constants.QUERY_WORKLOAD_TAG, authorizations = 
{@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
+    @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
+        SWAGGER_AUTHORIZATION_KEY, description =
+        "The format of the key is  ```\"Basic <token>\" or \"Bearer "
+            + "<token>\"```"), @ApiKeyAuthDefinition(name = 
CommonConstants.QUERY_WORKLOAD, in =
+    ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = 
CommonConstants.QUERY_WORKLOAD, description =
+    "Workload context passed through http header. If no context is provided 
'default' workload "
+        + "context will be considered.")
+}))
+@Path("/")
+public class PinotQueryWorkloadRestletResource {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotQueryWorkloadRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/queryWorkloadConfigs")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_QUERY_WORKLOAD_CONFIG)
+  @Authenticate(AccessType.READ)
+  @ApiOperation(value = "Get all query workload configs", notes = "Get all 
workload configs")
+  public String getQueryWorkloadConfigs(@Context HttpHeaders httpHeaders) {
+    try {
+      LOGGER.info("Received request to get all queryWorkloadConfigs");
+      List<QueryWorkloadConfig> queryWorkloadConfigs = 
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
+      if (queryWorkloadConfigs.isEmpty()) {
+        return JsonUtils.objectToString(Map.of());
+      }
+      String response = JsonUtils.objectToString(queryWorkloadConfigs);
+      LOGGER.info("Successfully fetched all queryWorkloadConfigs");
+      return response;
+    } catch (Exception e) {
+      String errorMessage = String.format("Error while getting all workload 
configs, error: %s", e);
+      throw new ControllerApplicationException(LOGGER, errorMessage, 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * API to fetch query workload config
+   * @param queryWorkloadName Name of the query workload
+   * Example request:
+   * /queryWorkloadConfigs/workload-foo1
+   * Example response:
+   * {
+   *   "queryWorkloadName" : "workload-foo1",
+   *   "nodeConfigs" : {
+   *   {
+   *       "nodeType" : "brokerNode",
+   *       "enforcementProfile": {
+   *         "cpuCostNs": 500,
+   *         "memoryCostBytes": 1000
+   *       },
+   *       "propagationScheme": {
+   *         "propagationType": "TABLE",
+   *         "values": ["airlineStats"]
+   *       }
+   *     },
+   *     {
+   *       "nodeType" : "serverNode",
+   *       "enforcementProfile": {
+   *         "cpuCostNs": 1500,
+   *         "memoryCostBytes": 12000
+   *       },
+   *       "propagationScheme": {
+   *         "propagationType": "TENANT",
+   *         "values": ["DefaultTenant"]
+   *       }
+   *     }
+   *   }
+   * }
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/queryWorkloadConfigs/{queryWorkloadName}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_QUERY_WORKLOAD_CONFIG)
+  @Authenticate(AccessType.READ)
+  @ApiOperation(value = "Get query workload config", notes = "Get workload 
configs for the workload name")
+  public String getQueryWorkloadConfig(@PathParam("queryWorkloadName") String 
queryWorkloadName,
+      @Context HttpHeaders httpHeaders) {
+    try {
+      LOGGER.info("Received request to get workload config for workload: {}", 
queryWorkloadName);
+      QueryWorkloadConfig queryWorkloadConfig = 
_pinotHelixResourceManager.getQueryWorkloadConfig(queryWorkloadName);
+      if (queryWorkloadConfig == null) {
+        throw new ControllerApplicationException(LOGGER, "Workload config not 
found for workload: " + queryWorkloadName,
+            Response.Status.NOT_FOUND, null);
+      }
+      String response = queryWorkloadConfig.toJsonString();
+      LOGGER.info("Successfully fetched workload config for workload: {}", 
queryWorkloadName);
+      return response;
+    } catch (Exception e) {
+      if (e instanceof ControllerApplicationException) {
+        throw (ControllerApplicationException) e;
+      } else {
+        String errorMessage = String.format("Error while getting workload 
config for workload: %s, error: %s",
+            queryWorkloadName, e);
+        throw new ControllerApplicationException(LOGGER, errorMessage, 
Response.Status.INTERNAL_SERVER_ERROR, e);
+      }
+    }
+  }
+
+
+  /**
+   * API to get all workload configs associated with the instance
+   * @param instanceName Helix instance name
+   * @return Map of workload name to instance cost
+   * Example request:
+   * /queryWorkloadConfigs/instance/Server_localhost_1234
+   * Example response:
+   * {
+   *  "workload1": {
+   *    "cpuCostNs": 100,
+   *    "memoryCostBytes":100
+   *  },
+   *  "workload2": {
+   *    "cpuCostNs": 50,
+   *    "memoryCostBytes": 50
+   *  }
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/queryWorkloadConfigs/instance/{instanceName}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_INSTANCE_QUERY_WORKLOAD_CONFIG)
+  @Authenticate(AccessType.READ)
+  @ApiOperation(value = "Get all workload configs associated with the 
instance",
+      notes = "Get all workload configs associated with the instance")
+  public String getQueryWorkloadConfigForInstance(@PathParam("instanceName") 
String instanceName,
+                                                  @Context HttpHeaders 
httpHeaders) {
+    try {
+      Map<String, InstanceCost> workloadToInstanceCostMap = 
_pinotHelixResourceManager.getQueryWorkloadManager()
+          .getWorkloadToInstanceCostFor(instanceName);
+      if (workloadToInstanceCostMap == null || 
workloadToInstanceCostMap.isEmpty()) {
+        throw new ControllerApplicationException(LOGGER, "No workload configs 
found for instance: " + instanceName,
+            Response.Status.NOT_FOUND, null);
+      }
+      return JsonUtils.objectToString(workloadToInstanceCostMap);
+    } catch (Exception e) {
+      if (e instanceof ControllerApplicationException) {
+        throw (ControllerApplicationException) e;
+      } else {
+        String errorMessage = String.format("Error while getting workload 
config for instance: %s, error: %s",
+            instanceName, e);
+        throw new ControllerApplicationException(LOGGER, errorMessage, 
Response.Status.INTERNAL_SERVER_ERROR, e);
+      }
+    }
+  }
+
+  /**
+   * Updates the query workload config
+   * @param requestString JSON string representing the QueryWorkloadConfig
+   * Example request:
+   * {
+   *   "queryWorkloadName" : "workload-foo1",
+   *   "nodeConfigs" : {
+   *    {
+   *       "nodeType" : "brokerNode",
+   *       "enforcementProfile": {
+   *         "cpuCostNs": 500,
+   *         "memoryCostBytes": 1000
+   *       },
+   *       "propagationScheme": {
+   *         "propagationType": "TABLE",
+   *         "values": ["airlineStats"]
+   *       }
+   *     },
+   *     {
+   *       "nodeType" : "serverNode",
+   *       "enforcementProfile": {
+   *         "cpuCostNs": 1500,
+   *         "memoryCostBytes": 12000
+   *       },
+   *       "propagationScheme": {
+   *         "propagationType": "TENANT",
+   *         "values": ["DefaultTenant"]
+   *       }
+   *     }
+   *   }
+   * }
+   *
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/queryWorkloadConfigs")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_QUERY_WORKLOAD_CONFIG)
+  @Authenticate(AccessType.UPDATE)
+  @ApiOperation(value = "Update query workload config", notes = "Update 
workload config for the workload name")
+  public Response updateQueryWorkloadConfig(String requestString, @Context 
HttpHeaders httpHeaders) {
+    try {
+      LOGGER.info("Received request to update queryWorkloadConfig with 
request: {}", requestString);
+      QueryWorkloadConfig queryWorkloadConfig = 
JsonUtils.stringToObject(requestString, QueryWorkloadConfig.class);
+      List<String> validationErrors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);
+      if (!validationErrors.isEmpty()) {
+        String errorMessage = String.format("Invalid query workload config: 
%s", validationErrors);
+        throw new ControllerApplicationException(LOGGER, errorMessage, 
Response.Status.BAD_REQUEST, null);
+      }
+      _pinotHelixResourceManager.setQueryWorkloadConfig(queryWorkloadConfig);
+      String successMessage = String.format("Query Workload config updated 
successfully for workload: %s",
+          queryWorkloadConfig.getQueryWorkloadName());
+      LOGGER.info(successMessage);
+      return Response.ok().entity(successMessage).build();
+    } catch (Exception e) {
+      String errorMessage = String.format("Error when updating query workload 
request: %s, error: %s",
+          requestString, e);
+      throw new ControllerApplicationException(LOGGER, errorMessage, 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * Deletes the query workload config
+   * @param queryWorkloadName Name of the query workload to be deleted
+   * Example request:
+   * /queryWorkloadConfigs/workload-foo1
+   */
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/queryWorkloadConfigs/{queryWorkloadName}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DELETE_QUERY_WORKLOAD_CONFIG)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Delete query workload config", notes = "Delete 
workload config for the workload name")
+  public Response deleteQueryWorkloadConfig(@PathParam("queryWorkloadName") 
String queryWorkloadName,
+      @Context HttpHeaders httpHeaders) {
+    try {
+      _pinotHelixResourceManager.deleteQueryWorkloadConfig(queryWorkloadName);
+      String successMessage = String.format("Query Workload config deleted 
successfully for workload: %s",
+          queryWorkloadName);
+      LOGGER.info(successMessage);
+      return Response.ok().entity(successMessage).build();
+    } catch (Exception e) {
+      String errorMessage = String.format("Error when deleting query workload 
for workload: %s, error: %s",
+          queryWorkloadName, e);
+      throw new ControllerApplicationException(LOGGER, errorMessage, 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * API to refresh propagation for a single query workload config
+   * This API doesn't update the config, it only triggers the propagation of 
an existing workload config
+   *
+   * @param queryWorkloadName Name of the query workload to refresh
+   * Example request:
+   * POST /queryWorkloadConfigs/{queryWorkloadName}/refresh
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/queryWorkloadConfigs/{queryWorkloadName}/refresh")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.UPDATE_QUERY_WORKLOAD_CONFIG)
+  @Authenticate(AccessType.UPDATE)
+  @ApiOperation(value = "Refresh query workload config propagation", notes = 
"Force propagation of an existing config")
+  public Response refreshQueryWorkloadConfig(@PathParam("queryWorkloadName") 
String queryWorkloadName,
+                                             @Context HttpHeaders httpHeaders) 
{
+    try {
+      LOGGER.info("Received request to refresh workload config propagation for 
workload: {}", queryWorkloadName);
+      // Fetch existing config
+      QueryWorkloadConfig existingConfig = 
_pinotHelixResourceManager.getQueryWorkloadConfig(queryWorkloadName);
+      if (existingConfig == null) {
+        throw new ControllerApplicationException(LOGGER, "Workload config not 
found for workload: " + queryWorkloadName,
+            Response.Status.NOT_FOUND, null);
+      }
+      
_pinotHelixResourceManager.getQueryWorkloadManager().propagateWorkloadUpdateMessage(existingConfig);
+      String successMessage = String.format("Query workload config propagation 
triggered for workload: %s",
+          queryWorkloadName);
+      LOGGER.info(successMessage);
+      return Response.ok().entity(successMessage).build();
+    } catch (Exception e) {
+      if (e instanceof ControllerApplicationException) {
+        throw (ControllerApplicationException) e;
+      } else {
+        String errorMessage = String.format("Error when refreshing query 
workload config for workload: %s, error: %s",
+            queryWorkloadName, e);
+        throw new ControllerApplicationException(LOGGER, errorMessage,
+            Response.Status.INTERNAL_SERVER_ERROR, e);
+      }
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a07caac5f1..5589bce4be 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -105,6 +105,7 @@ import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
 import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
 import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -155,6 +156,7 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.workload.QueryWorkloadManager;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -170,6 +172,7 @@ import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.user.ComponentType;
 import org.apache.pinot.spi.config.user.RoleType;
 import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -237,6 +240,7 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
   private final LineageManager _lineageManager;
+  private final QueryWorkloadManager _queryWorkloadManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
@@ -263,6 +267,7 @@ public class PinotHelixResourceManager {
       _lineageUpdaterLocks[i] = new Object();
     }
     _lineageManager = lineageManager;
+    _queryWorkloadManager = new QueryWorkloadManager(this);
   }
 
   public PinotHelixResourceManager(ControllerConf controllerConf) {
@@ -544,6 +549,11 @@ public class PinotHelixResourceManager {
         .filter(instance -> 
InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList());
   }
 
+  public List<String> getAllServerInstances() {
+    return HelixHelper.getAllInstances(_helixAdmin, 
_helixClusterName).stream().filter(InstanceTypeUtils::isServer)
+        .collect(Collectors.toList());
+  }
+
   /**
    * Get all instances with the given tag
    */
@@ -1831,7 +1841,7 @@ public class PinotHelixResourceManager {
           .put(tableNameWithType, 
SegmentAssignmentUtils.getInstanceStateMap(brokers, 
BrokerResourceStateModel.ONLINE));
       return is;
     });
-
+    _queryWorkloadManager.propagateWorkloadFor(tableNameWithType);
     LOGGER.info("Adding table {}: Successfully added table", 
tableNameWithType);
   }
 
@@ -2195,6 +2205,8 @@ public class PinotHelixResourceManager {
 
     // Send update query quota message if quota is specified
     sendTableConfigRefreshMessage(tableNameWithType);
+    // TODO: Propagate workload for tables if there is change is change 
instance characteristics
+    _queryWorkloadManager.propagateWorkloadFor(tableNameWithType);
   }
 
   public void deleteUser(String username) {
@@ -4788,6 +4800,53 @@ public class PinotHelixResourceManager {
     return tagMinInstanceMap;
   }
 
+  public List<QueryWorkloadConfig> getAllQueryWorkloadConfigs() {
+    return ZKMetadataProvider.getAllQueryWorkloadConfigs(_propertyStore);
+  }
+
+  @Nullable
+  public QueryWorkloadConfig getQueryWorkloadConfig(String queryWorkloadName) {
+    return ZKMetadataProvider.getQueryWorkloadConfig(_propertyStore, 
queryWorkloadName);
+  }
+
+  public void setQueryWorkloadConfig(QueryWorkloadConfig queryWorkloadConfig) {
+    if (!ZKMetadataProvider.setQueryWorkloadConfig(_propertyStore, 
queryWorkloadConfig)) {
+      throw new RuntimeException("Failed to set workload config for 
queryWorkloadName: "
+          + queryWorkloadConfig.getQueryWorkloadName());
+    }
+    _queryWorkloadManager.propagateWorkloadUpdateMessage(queryWorkloadConfig);
+  }
+
+  public void sendQueryWorkloadRefreshMessage(Map<String, 
QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) {
+    instanceToRefreshMessageMap.forEach((instance, message) -> {
+      Criteria criteria = new Criteria();
+      criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      criteria.setInstanceName(instance);
+      criteria.setSessionSpecific(true);
+
+      int numMessagesSent = 
_helixZkManager.getMessagingService().send(criteria, message, null, -1);
+      if (numMessagesSent > 0) {
+        LOGGER.info("Sent {} query workload config refresh messages to 
instance: {}", numMessagesSent, instance);
+      } else {
+        LOGGER.warn("No query workload config refresh message sent to 
instance: {}", instance);
+      }
+    });
+  }
+
+  public void deleteQueryWorkloadConfig(String workload) {
+    QueryWorkloadConfig queryWorkloadConfig = getQueryWorkloadConfig(workload);
+    if (queryWorkloadConfig == null) {
+      LOGGER.warn("Query workload config for {} does not exist, skipping 
deletion", workload);
+      return;
+    }
+    _queryWorkloadManager.propagateDeleteWorkloadMessage(queryWorkloadConfig);
+    ZKMetadataProvider.deleteQueryWorkloadConfig(_propertyStore, workload);
+  }
+
+  public QueryWorkloadManager getQueryWorkloadManager() {
+    return _queryWorkloadManager;
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java
new file mode 100644
index 0000000000..c093b98755
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.scheme.PropagationScheme;
+import org.apache.pinot.controller.workload.scheme.PropagationSchemeProvider;
+import org.apache.pinot.controller.workload.scheme.PropagationUtils;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
+import org.apache.pinot.controller.workload.splitter.DefaultCostSplitter;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The QueryWorkloadManager is responsible for managing the query workload 
configuration and propagating/computing
+ * the cost to be enforced by relevant instances based on the propagation 
scheme.
+ */
+public class QueryWorkloadManager {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(QueryWorkloadManager.class);
+
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PropagationSchemeProvider _propagationSchemeProvider;
+  private final CostSplitter _costSplitter;
+
+  public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _propagationSchemeProvider = new 
PropagationSchemeProvider(pinotHelixResourceManager);
+    // TODO: To make this configurable once we have multiple cost splitters 
implementations
+    _costSplitter = new DefaultCostSplitter();
+  }
+
+  /**
+   * Propagate the workload to the relevant instances based on the 
PropagationScheme
+   * @param queryWorkloadConfig The query workload configuration to propagate
+   * 1. Resolve the instances based on the node type and propagation scheme
+   * 2. Calculate the instance cost for each instance
+   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   */
+  public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
+    String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+      // Resolve the instances based on the node type and propagation scheme
+      Set<String> instances = resolveInstances(nodeConfig);
+      if (instances.isEmpty()) {
+        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
+        LOGGER.warn(errorMsg);
+        continue;
+      }
+      Map<String, InstanceCost> instanceCostMap = 
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
+      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+              QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
+      // Send the QueryWorkloadRefreshMessage to the instances
+      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+    }
+  }
+
+  /**
+   * Propagate delete workload refresh message for the given 
queryWorkloadConfig
+   * @param queryWorkloadConfig The query workload configuration to delete
+   * 1. Resolve the instances based on the node type and propagation scheme
+   * 2. Send the {@link QueryWorkloadRefreshMessage} with 
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+   */
+  public void propagateDeleteWorkloadMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
+    String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+      Set<String> instances = resolveInstances(nodeConfig);
+      if (instances.isEmpty()) {
+        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
+        LOGGER.warn(errorMsg);
+        continue;
+      }
+      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
+          .collect(Collectors.toMap(instance -> instance, instance -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+              QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
null)));
+      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+    }
+  }
+
+  /**
+   * Propagate the workload for the given table name, it does fast exits if 
queryWorkloadConfigs is empty
+   * @param tableName The table name to propagate the workload for, it can be 
a rawTableName or a tableNameWithType
+   * if rawTableName is provided, it will resolve all available tableTypes and 
propagate the workload for each tableType
+   *
+   * This method performs the following steps:
+   * 1. Find all the helix tags associated with the table
+   * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
+   * 3. Propagate the workload cost for instances associated with the workloads
+   */
+  public void propagateWorkloadFor(String tableName) {
+    try {
+      List<QueryWorkloadConfig> queryWorkloadConfigs = 
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
+      if (queryWorkloadConfigs.isEmpty()) {
+          return;
+      }
+      // Get the helixTags associated with the table
+      List<String> helixTags = 
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName);
+      // Find all workloads associated with the helix tags
+      Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
+          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
helixTags, queryWorkloadConfigs);
+      // Propagate the workload for each QueryWorkloadConfig
+      for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
+        propagateWorkloadUpdateMessage(queryWorkloadConfig);
+      }
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to propagate workload for table: 
%s", tableName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+  }
+
+  /**
+   * Get all the workload costs associated with the given instance and node 
type
+   * 1. Find all the helix tags associated with the instance
+   * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
+   * 3. Find the instance associated with the {@link QueryWorkloadConfig} and 
node type
+   *
+   * @param instanceName The instance name to get the workload costs for
+   * @return A map of workload name to {@link InstanceCost} for the given 
instance and node type
+   */
+  public Map<String, InstanceCost> getWorkloadToInstanceCostFor(String 
instanceName) {
+    try {
+      Map<String, InstanceCost> workloadToInstanceCostMap = new HashMap<>();
+      List<QueryWorkloadConfig> queryWorkloadConfigs = 
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
+      if (queryWorkloadConfigs.isEmpty()) {
+        LOGGER.warn("No query workload configs found in zookeeper");
+        return workloadToInstanceCostMap;
+      }
+      // Find all the helix tags associated with the instance
+      InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
+      if (instanceConfig == null) {
+        LOGGER.warn("Instance config not found for instance: {}", 
instanceName);
+        return workloadToInstanceCostMap;
+      }
+      NodeConfig.Type nodeType;
+      if (InstanceTypeUtils.isServer(instanceName)) {
+        nodeType = NodeConfig.Type.SERVER_NODE;
+      } else if (InstanceTypeUtils.isBroker(instanceName)) {
+        nodeType = NodeConfig.Type.BROKER_NODE;
+      } else {
+        LOGGER.warn("Unsupported instance type: {}, cannot compute workload 
costs", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // Find all workloads associated with the helix tags
+      Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
+          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceConfig.getTags(),
+                  queryWorkloadConfigs);
+      // Calculate the instance cost from each workload
+      for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
+        for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
+          if (nodeConfig.getNodeType() == nodeType) {
+            Set<String> instances = resolveInstances(nodeConfig);
+            InstanceCost instanceCost = 
_costSplitter.computeInstanceCost(nodeConfig, instances, instanceName);
+            if (instanceCost != null) {
+              
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+            }
+            break;
+          }
+        }
+      }
+      return workloadToInstanceCostMap;
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to get workload to instance cost 
map for instance: %s", instanceName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+  }
+
+  private Set<String> resolveInstances(NodeConfig nodeConfig) {
+    PropagationScheme propagationScheme =
+            
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
+    return propagationScheme.resolveInstances(nodeConfig);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java
new file mode 100644
index 0000000000..e9a373733b
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+
+public class DefaultPropagationScheme implements PropagationScheme {
+
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public DefaultPropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+  }
+
+  @Override
+  public Set<String> resolveInstances(NodeConfig nodeConfig) {
+    Set<String> instances;
+    NodeConfig.Type nodeType = nodeConfig.getNodeType();
+    switch (nodeType) {
+      case BROKER_NODE:
+        instances = new 
HashSet<>(_pinotHelixResourceManager.getAllBrokerInstances());
+        break;
+      case SERVER_NODE:
+        instances = new 
HashSet<>(_pinotHelixResourceManager.getAllServerInstances());
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid node type: " + nodeType);
+    }
+    return instances;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java
new file mode 100644
index 0000000000..d73602d6ec
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.Set;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+/**
+ * PropagationScheme is used to resolve instances based on the {@link 
NodeConfig}
+ * 1. It helps to identify which instances to propagate the workload to based 
on the node configuration
+ * 2. It helps among which instances the {@link 
org.apache.pinot.spi.config.workload.EnforcementProfile} should be split
+ */
+public interface PropagationScheme {
+  /**
+   * Resolve the instances based on the node type and node configuration
+   * @param nodeConfig The {@link NodeConfig} to resolve the instances
+   * @return The set of instances to propagate the workload
+   */
+  Set<String> resolveInstances(NodeConfig nodeConfig);
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java
new file mode 100644
index 0000000000..d3e2eea167
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+
+public class PropagationSchemeProvider {
+
+    private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+    public PropagationSchemeProvider(PinotHelixResourceManager 
pinotHelixResourceManager) {
+        _pinotHelixResourceManager = pinotHelixResourceManager;
+    }
+
+    public PropagationScheme getPropagationScheme(
+            org.apache.pinot.spi.config.workload.PropagationScheme.Type 
schemeType) {
+        switch (schemeType) {
+        case TABLE:
+            return new TablePropagationScheme(_pinotHelixResourceManager);
+        case TENANT:
+            return new TenantPropagationScheme(_pinotHelixResourceManager);
+        default:
+            return new DefaultPropagationScheme(_pinotHelixResourceManager);
+        }
+    }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java
new file mode 100644
index 0000000000..fa720e9e73
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * This class provides utility methods for workload propagation.
+ */
+public class PropagationUtils {
+
+  private PropagationUtils() {
+  }
+
+  /**
+   * Get the mapping tableNameWithType → {BROKER_NODE→brokerTag, 
SERVER_NODE→(serverTag + overrides)}
+   * 1. Get all table configs from the PinotHelixResourceManager
+   * 2. For each table config, extract the tenant config
+   * 3. For each tenant config, get the broker and server tags
+   * 4. Populate the helix tags for BROKER_NODE and SERVER_NODE separately
+   */
+  public static Map<String, Map<NodeConfig.Type, Set<String>>> 
getTableToHelixTags(
+          PinotHelixResourceManager pinotResourceManager) {
+    Map<String, Map<NodeConfig.Type, Set<String>>> tableToTags = new 
HashMap<>();
+    for (TableConfig tableConfig : pinotResourceManager.getAllTableConfigs()) {
+      TenantConfig tenantConfig = tableConfig.getTenantConfig();
+      TableType tableType = tableConfig.getTableType();
+
+      // Gather all relevant tags for this tenant
+      List<String> tenantTags = new ArrayList<>();
+      collectHelixTagsForTable(tenantTags, tenantConfig, tableType);
+
+      // Populate the helix tags for BROKER_NODE and SERVER_NODE separately to 
provide flexibility
+      // in workload propagation to direct the workload to only specific node 
types
+      String brokerTag = 
TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
+      Set<String> brokerTags = Collections.singleton(brokerTag);
+
+      Set<String> serverTags = new HashSet<>(tenantTags);
+      serverTags.remove(brokerTag);
+
+      Map<NodeConfig.Type, Set<String>> nodeTypeToTags = new 
EnumMap<>(NodeConfig.Type.class);
+      nodeTypeToTags.put(NodeConfig.Type.BROKER_NODE, brokerTags);
+      nodeTypeToTags.put(NodeConfig.Type.SERVER_NODE, serverTags);
+
+      tableToTags.put(tableConfig.getTableName(), nodeTypeToTags);
+    }
+    return tableToTags;
+  }
+
+  private static void collectHelixTagsForTable(List<String> tags, TenantConfig 
tenantConfig, TableType tableType) {
+    tags.add(TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker()));
+    if (tableType == TableType.OFFLINE) {
+      tags.add(TagNameUtils.getOfflineTagForTenant(tenantConfig.getServer()));
+    } else {
+      // Returns the realtime tag if completed server tag is not set
+      String completedServerTag = 
TagNameUtils.extractCompletedServerTag(tenantConfig);
+      // Returns the realtime tag if consuming server tag is not set
+      String consumingServerTag = 
TagNameUtils.extractConsumingServerTag(tenantConfig);
+      if (completedServerTag.equals(consumingServerTag)) {
+        tags.add(completedServerTag);
+      } else {
+        tags.add(consumingServerTag);
+        tags.add(completedServerTag);
+      }
+    }
+  }
+
+  /**
+   * Get the helix tags for a given table name.
+   * If the table name does not have a type suffix, it will return both 
offline and realtime tags.
+   */
+  public static List<String> getHelixTagsForTable(PinotHelixResourceManager 
pinotResourceManager, String tableName) {
+    List<String> combinedTags = new ArrayList<>();
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    List<String> tablesWithType = (tableType == null)
+        ? Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
+            TableNameBuilder.REALTIME.tableNameWithType(tableName))
+        : Collections.singletonList(tableName);
+    for (String table : tablesWithType) {
+      TableConfig tableConfig = pinotResourceManager.getTableConfig(table);
+      if (tableConfig != null) {
+        collectHelixTagsForTable(combinedTags, tableConfig.getTenantConfig(), 
tableConfig.getTableType());
+      }
+    }
+    return combinedTags;
+  }
+
+  /**
+   * Get the mapping between helix tag -> instances
+   */
+  public static Map<String, Set<String>> 
getHelixTagToInstances(PinotHelixResourceManager pinotResourceManager) {
+    Map<String, Set<String>> tagToInstances = new HashMap<>();
+    for (InstanceConfig instanceConfig : 
pinotResourceManager.getAllHelixInstanceConfigs()) {
+      String instanceName = instanceConfig.getInstanceName();
+      for (String helixTag : instanceConfig.getTags()) {
+        tagToInstances.computeIfAbsent(helixTag, tag -> new 
HashSet<>()).add(instanceName);
+      }
+    }
+    return tagToInstances;
+  }
+
+  /**
+   * Returns the set of {@link QueryWorkloadConfig}s that match any of the 
given Helix tags.
+   *
+   * This method filters the provided list of QueryWorkloadConfigs based on 
whether their propagation
+   * targets intersect with the specified `filterTags`. The matching is 
performed based on the
+   * propagation type defined for each node in the config:
+   *
+   * - For {@code TENANT} propagation:
+   *   1. Each value in the propagation scheme is treated as a tenant name or 
direct Helix tag.
+   *   2. If the value is a recognized Helix tag (broker/server), it is used 
directly.
+   *   3. Otherwise, the value is resolved to possible broker and server tags 
for the tenant.
+   *   4. If any resolved tag matches one of the `filterTags`, the config is 
included.
+   *
+   * - For {@code TABLE} propagation:
+   *   1. Table names are expanded to include type-suffixed forms (OFFLINE 
and/or REALTIME).
+   *   2. These table names are mapped to corresponding Helix tags using node 
type.
+   *   3. If any of the mapped tags intersect with `filterTags`, the config is 
included.
+   *
+   * @param pinotHelixResourceManager The resource manager used to look up 
table and instance metadata.
+   * @param filterTags The set of Helix tags used for filtering configs.
+   * @param queryWorkloadConfigs The full list of workload configs to evaluate.
+   * @return A set of workload configs whose propagation targets intersect 
with the filterTags.
+   */
+  public static Set<QueryWorkloadConfig> getQueryWorkloadConfigsForTags(
+      PinotHelixResourceManager pinotHelixResourceManager, List<String> 
filterTags,
+      List<QueryWorkloadConfig> queryWorkloadConfigs) {
+    Set<QueryWorkloadConfig> matchedConfigs = new HashSet<>();
+    Map<String, Map<NodeConfig.Type, Set<String>>> tableToHelixTags = 
getTableToHelixTags(pinotHelixResourceManager);
+
+    for (QueryWorkloadConfig queryWorkloadConfig : queryWorkloadConfigs) {
+      for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
+        PropagationScheme scheme = nodeConfig.getPropagationScheme();
+
+        if (scheme.getPropagationType() == PropagationScheme.Type.TENANT) {
+          for (String tenant : scheme.getValues()) {
+            Set<String> resolvedTags = TagNameUtils.isOfflineServerTag(tenant)
+                    || TagNameUtils.isRealtimeServerTag(tenant) || 
TagNameUtils.isBrokerTag(tenant)
+                ? Collections.singleton(tenant)
+                : new HashSet<>(getAllPossibleHelixTagsFor(tenant));
+            if (!Collections.disjoint(resolvedTags, filterTags)) {
+              matchedConfigs.add(queryWorkloadConfig);
+              break;
+            }
+          }
+        } else if (scheme.getPropagationType() == 
PropagationScheme.Type.TABLE) {
+          for (String tableName : scheme.getValues()) {
+            TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+            List<String> tablesWithType = (tableType == null)
+                ? 
Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
+                    TableNameBuilder.REALTIME.tableNameWithType(tableName))
+                : Collections.singletonList(tableName);
+            for (String tableWithType : tablesWithType) {
+              Set<String> resolvedTags = tableToHelixTags
+                  .getOrDefault(tableWithType, Collections.emptyMap())
+                  .getOrDefault(nodeConfig.getNodeType(), 
Collections.emptySet());
+              if (!Collections.disjoint(resolvedTags, filterTags)) {
+                matchedConfigs.add(queryWorkloadConfig);
+                break;
+              }
+            }
+          }
+        }
+      }
+    }
+    return matchedConfigs;
+  }
+
+  private static List<String> getAllPossibleHelixTagsFor(String tenantName) {
+    List<String> helixTags = new ArrayList<>();
+    helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName));
+    helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName));
+    helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName));
+    return helixTags;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java
new file mode 100644
index 0000000000..c463eb87db
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+/**
+ * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
+ * It resolves the instances based on the table names specified in the node 
configuration
+ */
+public class TablePropagationScheme implements PropagationScheme {
+
+  private static PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+  }
+
+  @Override
+  public Set<String> resolveInstances(NodeConfig nodeConfig) {
+    Set<String> instances = new HashSet<>();
+    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
+    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
+            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
+    Map<String, Set<String>> helixTagToInstances
+            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+    for (String tableName : tableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+      List<String> tablesWithType = new ArrayList<>();
+      if (tableType == null) {
+        // Get both offline and realtime table names if type is not present.
+        
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+        
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+      } else {
+        tablesWithType.add(tableName);
+      }
+      for (String tableWithType : tablesWithType) {
+        Map<NodeConfig.Type, Set<String>> nodeToHelixTags = 
tableWithTypeToHelixTags.get(tableWithType);
+        if (nodeToHelixTags != null) {
+          Set<String> helixTags = 
nodeToHelixTags.get(nodeConfig.getNodeType());
+          if (helixTags != null) {
+            for (String helixTag : helixTags) {
+              Set<String> helixInstances = helixTagToInstances.get(helixTag);
+              if (helixInstances != null) {
+                instances.addAll(helixInstances);
+              }
+            }
+          }
+        }
+      }
+    }
+    return instances;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java
new file mode 100644
index 0000000000..4564b0c282
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+
+/**
+ * TenantPropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
+ * It resolves the instances based on the tenants specified in the node 
configuration
+ */
+public class TenantPropagationScheme implements PropagationScheme {
+
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public TenantPropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+  }
+
+  @Override
+  public Set<String> resolveInstances(NodeConfig nodeConfig) {
+    Map<String, Set<String>> helixTagToInstances
+            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+    Set<String> allInstances = new HashSet<>();
+    List<String> tenantNames = nodeConfig.getPropagationScheme().getValues();
+    NodeConfig.Type nodeType = nodeConfig.getNodeType();
+    // Get the unique set of helix tags for the tenants
+    Set<String> helixTags = new HashSet<>();
+    for (String tenantName : tenantNames) {
+      if (nodeType == NodeConfig.Type.BROKER_NODE) {
+        helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName));
+      } else if (nodeType == NodeConfig.Type.SERVER_NODE) {
+        if (TagNameUtils.isOfflineServerTag(tenantName) || 
TagNameUtils.isRealtimeServerTag(tenantName)) {
+          helixTags.add(tenantName);
+        } else {
+          helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName));
+          helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName));
+        }
+      }
+    }
+    // Get the instances for the helix tags
+    for (String helixTag : helixTags) {
+      Set<String> instances = helixTagToInstances.get(helixTag);
+      if (instances != null) {
+        allInstances.addAll(instances);
+      }
+    }
+    return allInstances;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java
new file mode 100644
index 0000000000..9f7f070fb1
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.splitter;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+/**
+ * Interface for splitting the cost of a workload between instances.
+ */
+public interface CostSplitter {
+  /**
+   * Computes the cost for each instance in the given set of instances.
+   *
+   * @param nodeConfig the node configuration
+   * @param instances names of all instances involved
+   * @return a map from instance identifier to the cost for that instance
+   */
+  Map<String, InstanceCost> computeInstanceCostMap(NodeConfig nodeConfig, 
Set<String> instances);
+
+  /**
+   * Computes the cost for a specific instance.
+   *
+   * @param nodeConfig the node configuration
+   * @param instances names of all instances involved
+   * @param instance the instance identifier for which to compute the cost
+   * @return the cost for the specified instance
+   */
+  InstanceCost computeInstanceCost(NodeConfig nodeConfig, Set<String> 
instances, String instance);
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java
new file mode 100644
index 0000000000..9e40015a0b
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.splitter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+
+public class DefaultCostSplitter implements CostSplitter {
+
+  @Override
+  public Map<String, InstanceCost> computeInstanceCostMap(NodeConfig 
nodeConfig, Set<String> instances) {
+    InstanceCost cost = computeInstanceCost(nodeConfig, instances, null);
+    Map<String, InstanceCost> costMap = new HashMap<>();
+    for (String instance : instances) {
+      costMap.put(instance, cost);
+    }
+    return costMap;
+  }
+
+  @Override
+  public InstanceCost computeInstanceCost(NodeConfig nodeConfig, Set<String> 
instances, String instance) {
+    long totalInstances = instances.size();
+    EnforcementProfile enforcementProfile = nodeConfig.getEnforcementProfile();
+
+    long cpuCostNs = enforcementProfile.getCpuCostNs() / totalInstances;
+    long memoryCostBytes = enforcementProfile.getMemoryCostBytes() / 
totalInstances;
+
+    return new InstanceCost(cpuCostNs, memoryCostBytes);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java
new file mode 100644
index 0000000000..f487e68225
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.scheme.PropagationUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PropagationUtilsTest {
+
+    private PinotHelixResourceManager _pinotHelixResourceManager;
+
+    @BeforeClass
+    public void setUp() {
+        _pinotHelixResourceManager = 
Mockito.mock(PinotHelixResourceManager.class);
+    }
+
+    @Test
+    public void getTableToHelixTagsTest() {
+        // Create a list of table configurations
+        List<TableConfig> tableConfigs = new ArrayList<>();
+        tableConfigs.add(createTableConfig("table1", "serverTag1", 
"brokerTenant1", TableType.OFFLINE));
+        tableConfigs.add(createTableConfig("table2", "serverTag2", 
"brokerTenant2", TableType.REALTIME));
+        // Mock the behavior of getAllTableConfigs to return the list of table 
configurations
+        
Mockito.when(_pinotHelixResourceManager.getAllTableConfigs()).thenReturn(tableConfigs);
+        // Call the method to get table to Helix tags
+        Map<String, Map<NodeConfig.Type, Set<String>>> tableToHelixTags
+                = 
PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
+        // Verify the results
+        Map<String, Map<NodeConfig.Type, Set<String>>> expectedTags = new 
HashMap<>();
+        expectedTags.put("table1_OFFLINE", new HashMap<>() {{
+            put(NodeConfig.Type.SERVER_NODE, Set.of("serverTag1_OFFLINE"));
+            put(NodeConfig.Type.BROKER_NODE, Set.of("brokerTenant1_BROKER"));
+        }});
+        expectedTags.put("table2_REALTIME", new HashMap<>() {{
+            put(NodeConfig.Type.SERVER_NODE, Set.of("serverTag2_REALTIME"));
+            put(NodeConfig.Type.BROKER_NODE, Set.of("brokerTenant2_BROKER"));
+        }});
+
+        Assert.assertEquals(tableToHelixTags.size(), expectedTags.size(),
+                "Expected size of table to helix tags mapping does not match");
+        for (Map.Entry<String, Map<NodeConfig.Type, Set<String>>> tableEntry : 
expectedTags.entrySet()) {
+            String tableName = tableEntry.getKey();
+            Map<NodeConfig.Type, Set<String>> expectedNodeTags = 
tableEntry.getValue();
+            // For each node type in the expected map, assert the tag exists
+            for (Map.Entry<NodeConfig.Type, Set<String>> entry : 
expectedNodeTags.entrySet()) {
+                NodeConfig.Type nodeType = entry.getKey();
+                Set<String> expectedTag = entry.getValue();
+                
Assert.assertEquals(tableToHelixTags.get(tableName).get(nodeType), expectedTag,
+                        "Expected " + expectedTag + " for " + tableName + " 
with node type " + nodeType
+                                + " but found " + 
tableToHelixTags.get(tableName).get(nodeType));
+            }
+        }
+    }
+
+    @Test
+    public void getHelixTagsForTableTest() {
+        // Mock the behavior of getHelixTagsForTable to return a set of helix 
tags
+        TableConfig tableConfig = createTableConfig("table1", "serverTag1", 
"brokerTenant1", TableType.OFFLINE);
+        TableConfig tableConfig2 = createTableConfig("table1", "serverTag2", 
"brokerTenant2", TableType.REALTIME);
+        
Mockito.when(_pinotHelixResourceManager.getTableConfig("table1_OFFLINE")).thenReturn(tableConfig);
+        
Mockito.when(_pinotHelixResourceManager.getTableConfig("table1_REALTIME")).thenReturn(tableConfig2);
+
+        // Define the expected helix tags for the table
+        Map<String, Set<String>> expected = new HashMap<>();
+        expected.put("table1_OFFLINE", Set.of("serverTag1_OFFLINE", 
"brokerTenant1_BROKER"));
+        expected.put("table1_REALTIME", Set.of("serverTag2_REALTIME", 
"brokerTenant2_BROKER"));
+
+        for (Map.Entry<String, Set<String>> entry : expected.entrySet()) {
+            String tableName = entry.getKey();
+            Set<String> expectedHelixTags = entry.getValue();
+            // Call the method to get helix tags for the table
+            List<String> helixTags = 
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager,
+                    tableName);
+            // Verify the results
+            for (String helixTag : expectedHelixTags) {
+                Assert.assertTrue(helixTags.contains(helixTag),
+                        "Expected helix tag " + helixTag + " for table " + 
tableName + " but found " + helixTags);
+            }
+        }
+    }
+
+    @Test
+    public void getHelixTagToInstancesTest() {
+        // Create a list of instance configurations
+        List<InstanceConfig> instanceConfigs = List.of(
+                createInstanceConfig("instance1", 
List.of("serverTag1_OFFLINE")),
+                createInstanceConfig("instance2", 
List.of("serverTag1_OFFLINE")),
+                createInstanceConfig("instance3", 
List.of("brokerTenant1_BROKER")),
+                createInstanceConfig("instance4", 
List.of("brokerTenant1_BROKER"))
+        );
+        
Mockito.when(_pinotHelixResourceManager.getAllHelixInstanceConfigs()).thenReturn(instanceConfigs);
+        // Call the method to get Helix tag to instances mapping
+        Map<String, Set<String>> helixTagToInstances
+                = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+        // Verify the results
+        Map<String, Set<String>> expected = new HashMap<>();
+        expected.put("serverTag1_OFFLINE", Set.of("instance1", "instance2"));
+        expected.put("brokerTenant1_BROKER", Set.of("instance3", "instance4"));
+
+        Assert.assertEquals(helixTagToInstances.size(), expected.size(),
+                "Expected size of helix tag to instances mapping does not 
match");
+        for (Map.Entry<String, Set<String>> entry : expected.entrySet()) {
+            String helixTag = entry.getKey();
+            Set<String> expectedInstances = entry.getValue();
+            Assert.assertTrue(helixTagToInstances.containsKey(helixTag),
+                    "Expected helix tag " + helixTag + " but not found in the 
mapping");
+            for (String instance : expectedInstances) {
+                
Assert.assertTrue(helixTagToInstances.get(helixTag).contains(instance),
+                        "Expected instance " + instance + " for helix tag " + 
helixTag + " but found "
+                                + helixTagToInstances.get(helixTag));
+            }
+        }
+    }
+
+    @Test
+    public void getQueryWorkloadConfigsForTagsTest() {
+        // Create a list of query workload configurations
+        QueryWorkloadConfig workloadConfig1 = 
createQueryWorkloadConfig("workload1",
+            new PropagationScheme(PropagationScheme.Type.TABLE, 
List.of("table1", "table2")),
+            new PropagationScheme(PropagationScheme.Type.TABLE, 
List.of("table1", "table2")));
+        QueryWorkloadConfig workloadConfig2 = 
createQueryWorkloadConfig("workload2",
+            new PropagationScheme(PropagationScheme.Type.TENANT, 
List.of("serverTag1")),
+            new PropagationScheme(PropagationScheme.Type.TENANT, 
List.of("brokerTenant1_BROKER")));
+        QueryWorkloadConfig workloadConfig3 = 
createQueryWorkloadConfig("workload3",
+            new PropagationScheme(PropagationScheme.Type.TENANT, 
List.of("serverTag2_REALTIME")),
+            new PropagationScheme(PropagationScheme.Type.TENANT, 
List.of("brokerTenant2")));
+        List<QueryWorkloadConfig> queryWorkloadConfigs = 
List.of(workloadConfig1, workloadConfig2, workloadConfig3);
+        // Create TableConfig for the workload
+        List<TableConfig> tableConfigs = List.of(
+                createTableConfig("table1", "serverTag1", "brokerTenant1", 
TableType.OFFLINE),
+                createTableConfig("table2", "serverTag2", "brokerTenant2", 
TableType.REALTIME)
+        );
+        // Mock the behavior of getAllTableConfigs to return the list of table 
configurations
+        
Mockito.when(_pinotHelixResourceManager.getAllTableConfigs()).thenReturn(tableConfigs);
+
+        List<String> helixTags = List.of("serverTag1_OFFLINE", 
"brokerTenant1_BROKER",
+                "serverTag2_REALTIME", "brokerTenant2_BROKER");
+        Set<QueryWorkloadConfig> workloadConfigsForTags =
+                
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
helixTags,
+                        queryWorkloadConfigs);
+        // Verify the results
+        Set<QueryWorkloadConfig> expectedWorkloadConfigs = 
Set.of(workloadConfig1, workloadConfig2, workloadConfig3);
+        Assert.assertEquals(workloadConfigsForTags.size(), 
expectedWorkloadConfigs.size(),
+                "Expected size of workload configs for tags does not match");
+        for (QueryWorkloadConfig workloadConfig : workloadConfigsForTags) {
+            Assert.assertTrue(expectedWorkloadConfigs.contains(workloadConfig),
+                    "Expected workload config " + 
workloadConfig.getQueryWorkloadName()
+                            + " but not found in the expected set");
+        }
+    }
+
+    private TableConfig createTableConfig(String tableName, String serverTag, 
String brokerTenant, TableType type) {
+        return new TableConfigBuilder(type)
+            .setTableName(tableName)
+            
.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy")
+            .setNumReplicas(1)
+            .setBrokerTenant(brokerTenant)
+            .setServerTenant(serverTag)
+            .setLoadMode("HEAP")
+            .setSegmentVersion(null)
+            .build();
+    }
+
+    private InstanceConfig createInstanceConfig(String instanceName, 
List<String> helixTags) {
+        InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+        for (String helixTag : helixTags) {
+            instanceConfig.addTag(helixTag);
+        }
+        return instanceConfig;
+    }
+
+    private QueryWorkloadConfig createQueryWorkloadConfig(String name, 
PropagationScheme serverScheme,
+                                                          PropagationScheme 
brokerScheme) {
+        EnforcementProfile enforcementProfile = new EnforcementProfile(10, 10);
+        return new QueryWorkloadConfig(name, List.of(
+           new NodeConfig(NodeConfig.Type.SERVER_NODE, enforcementProfile, 
serverScheme),
+           new NodeConfig(NodeConfig.Type.BROKER_NODE, enforcementProfile, 
brokerScheme)
+      ));
+    }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 2fa066e991..6dec9b7903 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -99,6 +99,10 @@ public class Actions {
     public static final String UPDATE_INSTANCE_PARTITIONS = 
"UpdateInstancePartitions";
     public static final String GET_RESPONSE_STORE = "GetResponseStore";
     public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore";
+    public static final String GET_QUERY_WORKLOAD_CONFIG = 
"GetQueryWorkloadConfig";
+    public static final String GET_INSTANCE_QUERY_WORKLOAD_CONFIG = 
"GetInstanceQueryWorkloadConfig";
+    public static final String UPDATE_QUERY_WORKLOAD_CONFIG = 
"UpdateQueryWorkloadConfig";
+    public static final String DELETE_QUERY_WORKLOAD_CONFIG = 
"DeleteQueryWorkloadConfig";
     public static final String GET_GROOVY_STATIC_ANALYZER_CONFIG = 
"GetGroovyStaticAnalyzerConfig";
     public static final String UPDATE_GROOVY_STATIC_ANALYZER_CONFIG = 
"UpdateGroovyStaticAnalyzerConfig";
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 76d4a3a39b..adb8509090 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -721,6 +721,12 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
     _helixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
messageHandlerFactory);
+    // Query workload message handler factory
+    QueryWorkloadMessageHandlerFactory queryWorkloadMessageHandlerFactory =
+        new QueryWorkloadMessageHandlerFactory(serverMetrics);
+    _helixManager.getMessagingService()
+        
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            queryWorkloadMessageHandlerFactory);
 
     serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> 
_helixManager.isConnected() ? 1L : 0L);
     _helixManager.addPreConnectCallback(
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
new file mode 100644
index 0000000000..bdf91fc7ff
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryWorkloadMessageHandlerFactory implements 
MessageHandlerFactory {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryWorkloadMessageHandlerFactory.class);
+    private final ServerMetrics _metrics;
+
+    public QueryWorkloadMessageHandlerFactory(ServerMetrics metrics) {
+        _metrics = metrics;
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext 
context) {
+       String messageType = message.getMsgSubType();
+       if 
(messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
+           || 
messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
 {
+           return new QueryWorkloadRefreshMessageHandler(new 
QueryWorkloadRefreshMessage(message), context);
+        } else {
+            throw new IllegalArgumentException("Unknown message subtype: " + 
messageType);
+        }
+    }
+
+    // Gets called once during start up. We must return the same message type 
that this factory is registered for.
+    @Override
+    public String getMessageType() {
+        return Message.MessageType.USER_DEFINE_MSG.toString();
+    }
+
+    @Override
+    public void reset() {
+        LOGGER.info("Reset called");
+    }
+
+    private static class QueryWorkloadRefreshMessageHandler extends 
MessageHandler {
+        final String _queryWorkloadName;
+        final InstanceCost _instanceCost;
+
+        QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage 
queryWorkloadRefreshMessage,
+                                           NotificationContext context) {
+            super(queryWorkloadRefreshMessage, context);
+            _queryWorkloadName = 
queryWorkloadRefreshMessage.getQueryWorkloadName();
+            _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+        }
+
+        @Override
+        public HelixTaskResult handleMessage() {
+            // TODO: Add logic to invoke the query workload manager to 
refresh/delete the query workload config
+            HelixTaskResult result = new HelixTaskResult();
+            result.setSuccess(true);
+            return result;
+        }
+
+        @Override
+        public void onError(Exception e, ErrorCode errorCode, ErrorType 
errorType) {
+            LOGGER.error("Got error while refreshing query workload config for 
query workload: {} (error code: {},"
+                    + " error type: {})", _queryWorkloadName, errorCode, 
errorType, e);
+        }
+    }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java
new file mode 100644
index 0000000000..b520e6c4a5
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Defines the resource enforcement profile for a node within a query workload.
+ * <p>
+ * This profile specifies the maximum CPU time (in nanoseconds) and maximum 
memory (in bytes)
+ * that queries under this workload are allowed to consume on the node.
+ * </p>
+ *
+ * @see QueryWorkloadConfig
+ * @see NodeConfig
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class EnforcementProfile extends BaseJsonConfig {
+
+  private static final String CPU_COST_NS = "cpuCostNs";
+  private static final String MEMORY_COST_BYTES = "memoryCostBytes";
+
+  @JsonPropertyDescription("Max CPU cost allowed for the workload")
+  private long _cpuCostNs;
+  @JsonPropertyDescription("Max memory cost allowed for the workload")
+  private long _memoryCostBytes;
+
+  /**
+   * Constructs an EnforcementProfile with specified resource limits.
+   *
+   * @param cpuCostNs maximum allowed CPU cost in nanoseconds for the workload
+   * @param memoryCostBytes maximum allowed memory cost in bytes for the 
workload
+   */
+  public EnforcementProfile(@JsonProperty(CPU_COST_NS) long cpuCostNs,
+                            @JsonProperty(MEMORY_COST_BYTES) long 
memoryCostBytes) {
+    _cpuCostNs = cpuCostNs;
+    _memoryCostBytes = memoryCostBytes;
+  }
+
+  /**
+   * Returns the maximum CPU cost allowed for this workload.
+   *
+   * @return CPU cost limit in nanoseconds
+   */
+  public long getCpuCostNs() {
+    return _cpuCostNs;
+  }
+
+  /**
+   * Returns the maximum memory cost allowed for this workload.
+   *
+   * @return memory cost limit in bytes
+   */
+  public long getMemoryCostBytes() {
+    return _memoryCostBytes;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java
new file mode 100644
index 0000000000..a7befaaed1
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+
+/**
+ * Represents the resource cost profile of an instance for query workload 
enforcement.
+ * <p>
+ * This class defines the CPU and memory cost thresholds that an instance can 
incur
+ * when executing queries, used when propagating workload configurations to 
individual instances.
+ * </p>
+ *
+ */
+public class InstanceCost {
+
+  private static final String CPU_COST_NS = "cpuCostNs";
+  private static final String MEMORY_COST_BYTES = "memoryCostBytes";
+
+  /**
+   * The CPU cost threshold for the instance, in nanoseconds.
+   */
+  @JsonPropertyDescription("CPU cost of the instance in nanoseconds")
+  private long _cpuCostNs;
+  /**
+   * The memory cost threshold for the instance, in bytes.
+   */
+  @JsonPropertyDescription("Memory cost of the instance in bytes")
+  private long _memoryCostBytes;
+
+  /**
+   * Constructs an InstanceCost profile with specified CPU and memory 
thresholds.
+   *
+   * @param cpuCostNs     CPU cost threshold in nanoseconds for this instance
+   * @param memoryCostBytes memory cost threshold in bytes for this instance
+   */
+  @JsonCreator
+  public InstanceCost(@JsonProperty(CPU_COST_NS) long cpuCostNs,
+                      @JsonProperty(MEMORY_COST_BYTES) long memoryCostBytes) {
+    _cpuCostNs = cpuCostNs;
+    _memoryCostBytes = memoryCostBytes;
+  }
+
+  /**
+   * Returns the CPU cost threshold for this instance.
+   *
+   * @return CPU cost limit in nanoseconds
+   */
+  public long getCpuCostNs() {
+    return _cpuCostNs;
+  }
+
+  /**
+   * Returns the memory cost threshold for this instance.
+   *
+   * @return memory cost limit in bytes
+   */
+  public long getMemoryCostBytes() {
+    return _memoryCostBytes;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java
new file mode 100644
index 0000000000..25be645191
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.annotation.JsonValue;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Represents the configuration for a specific node type in a query workload.
+ * <p>
+ * Each NodeConfig specifies:
+ * <ul>
+ *   <li><strong>Node Type:</strong> The role of the node in processing 
queries.</li>
+ *   <li><strong>Enforcement Profile:</strong> Resource limits (CPU and 
memory) applied to this node.</li>
+ *   <li><strong>Propagation Scheme:</strong> Optional instructions for 
cascading configs to downstream nodes.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * This class is used within {@link QueryWorkloadConfig} to define per-node 
settings
+ * that tailor query execution behavior based on workload classification.
+ * </p>
+ *
+ * @see QueryWorkloadConfig
+ * @see EnforcementProfile
+ * @see PropagationScheme
+ */
+public class NodeConfig extends BaseJsonConfig {
+
+  public enum Type {
+    BROKER_NODE("brokerNode"),
+    SERVER_NODE("serverNode");
+
+    private final String _value;
+
+    Type(String jsonValue) {
+      _value = jsonValue;
+    }
+
+    @JsonValue
+    public String getJsonValue() {
+      return _value;
+    }
+
+    @JsonCreator
+    public static Type forValue(String value) {
+      if (value == null) {
+        return null;
+      }
+      // Normalize the input to lower case and trim spaces
+      String normalized = value.toLowerCase().trim();
+      for (Type type : Type.values()) {
+        if (type.getJsonValue().toLowerCase().equals(normalized)) {
+          return type;
+        }
+      }
+      throw new IllegalArgumentException("Invalid node type: " + value);
+    }
+  }
+
+  private static final String NODE_TYPE = "nodeType";
+  private static final String ENFORCEMENT_PROFILE = "enforcementProfile";
+  private static final String PROPAGATION_SCHEME = "propagationScheme";
+
+  /**
+   * The role of this node within the query workload, indicating whether it 
directly serves
+   * queries or acts as an intermediate forwarding node.
+   */
+  @JsonPropertyDescription("Describes the type of node")
+  private Type _nodeType;
+
+  /**
+   * The resource enforcement profile for this node, defining limits on CPU 
and memory
+   * usage for queries under this workload.
+   */
+  @JsonPropertyDescription("Describes the enforcement profile for the node")
+  private EnforcementProfile _enforcementProfile;
+
+  /**
+   * Optional propagation scheme that specifies how configuration settings are 
cascaded
+   * or shared with downstream nodes; may be null if no propagation is applied.
+   */
+  @JsonPropertyDescription("Describes the propagation scheme for the node")
+  private PropagationScheme _propagationScheme;
+
+  @JsonCreator
+  public NodeConfig(
+      @JsonProperty(NODE_TYPE) Type nodeType,
+      @JsonProperty(ENFORCEMENT_PROFILE) EnforcementProfile enforcementProfile,
+      @JsonProperty(PROPAGATION_SCHEME) @Nullable PropagationScheme 
propagationScheme) {
+    _nodeType = nodeType;
+    _enforcementProfile = enforcementProfile;
+    _propagationScheme = propagationScheme;
+  }
+
+  public Type getNodeType() {
+      return _nodeType;
+  }
+
+  public EnforcementProfile getEnforcementProfile() {
+    return _enforcementProfile;
+  }
+
+  public PropagationScheme getPropagationScheme() {
+    return _propagationScheme;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java
new file mode 100644
index 0000000000..ac68bbea68
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.List;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Defines how configuration settings are propagated across workloads.
+ * <p>
+ * A PropagationScheme determines the scope and specific values (e.g., tables 
or tenants)
+ * to which workload settings should be applied. This allows selective 
cascading
+ * of resource and query limits across different instances.
+ * </p>
+ *
+ * @see QueryWorkloadConfig
+ * @see NodeConfig
+ */
+public class PropagationScheme extends BaseJsonConfig {
+
+  /**
+   * Enumerates the propagation scheme types that control the scope of 
propagation.
+   * <p>
+   * - TABLE: Propagate settings at the per-table level.<br>
+   * - TENANT: Propagate settings at the tenant (logical group) level.
+   * </p>
+   */
+  public enum Type {
+    /** Propagate workload settings to individual tables. */
+    TABLE("table"),
+    /** Propagate workload settings to all tables under a tenant. */
+    TENANT("tenant");
+
+    private final String _value;
+
+    Type(String value) {
+      _value = value;
+    }
+
+    /**
+     * Returns the JSON string representation of this propagation type.
+     *
+     * @return the JSON value corresponding to this Type (e.g., "table", 
"tenant")
+     */
+    @JsonValue
+    public String getJsonValue() {
+      return _value;
+    }
+
+    /**
+     * Parses a JSON string into the corresponding Type enum.
+     * <p>
+     * Accepts case-insensitive and trimmed input matching defined JSON values.
+     * </p>
+     *
+     * @param value JSON string to parse (may be null)
+     * @return the matching Type enum, or null if input is null
+     * @throws IllegalArgumentException if the input does not match any Type
+     */
+    @JsonCreator
+    public static Type forValue(String value) {
+      if (value == null) {
+        return null;
+      }
+      String normalized = value.toLowerCase().trim();
+      for (Type type : Type.values()) {
+        if (type.getJsonValue().equals(normalized)) {
+          return type;
+        }
+      }
+      throw new IllegalArgumentException("Invalid propagation scheme type: " + 
value);
+    }
+  }
+
+  private static final String PROPAGATION_TYPE = "propagationType";
+  private static final String VALUES = "values";
+
+  /**
+   * The type of propagation to apply (per-table or per-tenant).
+   */
+  @JsonPropertyDescription("Describes the type of propagation scheme")
+  private Type _propagationType;
+
+  /**
+   * The specific identifiers (table names or tenant names) to which settings 
apply.
+   */
+  @JsonPropertyDescription("Describes the values of the propagation scheme")
+  private List<String> _values;
+
+  /**
+   * Constructs a PropagationScheme with the given type and target values.
+   *
+   * @param propagationType the Type of propagation (TABLE or TENANT)
+   * @param values the list of identifiers (tables or tenants) for propagation
+   */
+  @JsonCreator
+  public PropagationScheme(@JsonProperty(PROPAGATION_TYPE) Type 
propagationType,
+      @JsonProperty(VALUES) List<String> values) {
+    _propagationType = propagationType;
+    _values = values;
+  }
+
+  /**
+   * Returns the configured propagation type.
+   *
+   * @return the Type enum indicating propagation scope
+   */
+  public Type getPropagationType() {
+    return _propagationType;
+  }
+
+  /**
+   * Returns the list of target identifiers for propagation.
+   *
+   * @return list of table names or tenant names
+   */
+  public List<String> getValues() {
+    return _values;
+  }
+
+  /**
+   * Sets the propagation type.
+   *
+   * @param propagationType new Type to define propagation scope
+   */
+  public void setPropagationType(Type propagationType) {
+    _propagationType = propagationType;
+  }
+
+  /**
+   * Sets the target identifiers for propagation.
+   *
+   * @param values list of table or tenant names to apply settings to
+   */
+  public void setValues(List<String> values) {
+    _values = values;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java
new file mode 100644
index 0000000000..477a7f5668
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.List;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Represents the configuration for a named query workload in a Pinot Helix 
cluster.
+ * <p>
+ * A QueryWorkload groups a set of nodes and associated configuration 
parameters under a single workload name.
+ * Workloads are applied at the cluster level: individual queries specify the 
workload they belong to by
+ * providing the workload name in their query options. This allows us to 
manage and enforce isolation by limiting
+ * the resources available to queries based on their workload classification.
+ * </p>
+ * <p><strong>Example:</strong></p>
+ * <pre>{@code
+ * {
+ *   "queryWorkloadName": "analytics",
+ *   "nodeConfigs": [
+ *     {
+ *       "nodeType": "brokerNode",
+ *       "enforcementProfile": {
+ *        "cpuCostNs": 1000000,
+ *        "memoryCostBytes": 10000000
+ *       },
+ *       "propagationScheme": {
+ *         "propagationType": "TABLE",
+ *         "values": ["airlineStats"]
+ *       }
+ *      },
+ *     {
+ *       "nodeType": "serverNode",
+ *       "enforcementProfile": {
+ *        "cpuCostNs": 2000000,
+ *        "memoryCostBytes": 20000000
+ *       },
+ *       "propagationScheme": {
+ *        "propagationType": "TENANT",
+ *        "values": ["tenantA", "tenantB"]
+ *       }
+ *     }
+ *   ]
+ * }
+ * }</pre>
+ *
+ * @see NodeConfig
+ */
+public class QueryWorkloadConfig extends BaseJsonConfig {
+
+  public static final String QUERY_WORKLOAD_NAME = "queryWorkloadName";
+  public static final String NODE_CONFIGS = "nodeConfigs";
+
+  @JsonPropertyDescription("Describes the name for the query workload, this 
should be unique across the zk cluster")
+  private String _queryWorkloadName;
+
+  @JsonPropertyDescription("Describes the node configs for the query workload")
+  private List<NodeConfig> _nodeConfigs;
+
+  @JsonCreator
+  /**
+   * Constructs a new QueryWorkloadConfig instance.
+   *
+   * @param queryWorkloadName unique name identifying this workload across the 
cluster
+   * @param nodeConfigs list of per-node configurations for this workload
+   */
+  public QueryWorkloadConfig(@JsonProperty(QUERY_WORKLOAD_NAME) String 
queryWorkloadName,
+      @JsonProperty(NODE_CONFIGS) List<NodeConfig> nodeConfigs) {
+    _queryWorkloadName = queryWorkloadName;
+    _nodeConfigs = nodeConfigs;
+  }
+
+  /**
+   * Returns the unique name of this query workload.
+   *
+   * @return the workload name used by queries to specify this workload
+   */
+  public String getQueryWorkloadName() {
+    return _queryWorkloadName;
+  }
+
+  /**
+   * Returns the list of node-specific configurations for this workload.
+   *
+   * @return list of NodeConfig objects for this workload
+   */
+  public List<NodeConfig> getNodeConfigs() {
+    return _nodeConfigs;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index bd76f381c0..81af254f52 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -72,6 +72,7 @@ public class CommonConstants {
   public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1";
 
   public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = 
"pinot.tar.compression.codec.name";
+  public static final String QUERY_WORKLOAD = "queryWorkload";
 
   public static final String JFR = "pinot.jfr";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to