This is an automated email from the ASF dual-hosted git repository. vvivekiyer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 26ce91bddd [Query Resource Isolation] Workload Configs (#15109) 26ce91bddd is described below commit 26ce91bdddfde13cc78fb960477f56f21662d2a1 Author: Praveen <praveenkchagan...@gmail.com> AuthorDate: Fri Jun 13 09:28:19 2025 -0700 [Query Resource Isolation] Workload Configs (#15109) * Workload Configs * workload config * Add API * config * Change config structure * Propagation strategy * Fix style check * Cost spliting on update * Table addition propagation * perf * Tests * test * test 2 * Review comments 1 * review comments 3 * review comments 3 * name change * review comments 4 --- .../BrokerUserDefinedMessageHandlerFactory.java | 31 ++ .../messages/QueryWorkloadRefreshMessage.java | 69 +++++ .../pinot/common/metadata/ZKMetadataProvider.java | 57 ++++ .../utils/config/QueryWorkloadConfigUtils.java | 234 +++++++++++++++ .../utils/config/QueryWorkloadConfigUtilsTest.java | 199 ++++++++++++ .../pinot/controller/api/resources/Constants.java | 1 + .../PinotQueryWorkloadRestletResource.java | 332 +++++++++++++++++++++ .../helix/core/PinotHelixResourceManager.java | 61 +++- .../controller/workload/QueryWorkloadManager.java | 203 +++++++++++++ .../workload/scheme/DefaultPropagationScheme.java | 51 ++++ .../workload/scheme/PropagationScheme.java | 36 +++ .../workload/scheme/PropagationSchemeProvider.java | 42 +++ .../workload/scheme/PropagationUtils.java | 211 +++++++++++++ .../workload/scheme/TablePropagationScheme.java | 78 +++++ .../workload/scheme/TenantPropagationScheme.java | 72 +++++ .../controller/workload/splitter/CostSplitter.java | 48 +++ .../workload/splitter/DefaultCostSplitter.java | 51 ++++ .../controller/workload/PropagationUtilsTest.java | 214 +++++++++++++ .../java/org/apache/pinot/core/auth/Actions.java | 4 + .../server/starter/helix/BaseServerStarter.java | 6 + .../helix/QueryWorkloadMessageHandlerFactory.java | 87 ++++++ .../spi/config/workload/EnforcementProfile.java | 78 +++++ .../pinot/spi/config/workload/InstanceCost.java | 79 +++++ .../pinot/spi/config/workload/NodeConfig.java | 126 ++++++++ .../spi/config/workload/PropagationScheme.java | 158 ++++++++++ .../spi/config/workload/QueryWorkloadConfig.java | 109 +++++++ .../apache/pinot/spi/utils/CommonConstants.java | 1 + 27 files changed, 2637 insertions(+), 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java index 81ea3d0d4f..1b2e7ed045 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java @@ -28,10 +28,12 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage; import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage; +import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; import org.apache.pinot.common.utils.DatabaseUtils; +import org.apache.pinot.spi.config.workload.InstanceCost; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,9 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac return new RefreshDatabaseConfigMessageHandler(new DatabaseConfigRefreshMessage(message), context); case ApplicationQpsQuotaRefreshMessage.REFRESH_APP_QUOTA_MSG_SUB_TYPE: return new RefreshApplicationQpsQuotaMessageHandler(new ApplicationQpsQuotaRefreshMessage(message), context); + case QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE: + case QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE: + return new QueryWorkloadRefreshMessageHandler(new QueryWorkloadRefreshMessage(message), context); default: // NOTE: Log a warning and return no-op message handler for unsupported message sub-types. This can happen when // a new message sub-type is added, and the sender gets deployed first while receiver is still running the @@ -259,4 +264,30 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac LOGGER.error("Got error for no-op message handling (error code: {}, error type: {})", code, type, e); } } + + private static class QueryWorkloadRefreshMessageHandler extends MessageHandler { + final String _queryWorkloadName; + final InstanceCost _instanceCost; + + QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage queryWorkloadRefreshMessage, + NotificationContext context) { + super(queryWorkloadRefreshMessage, context); + _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName(); + _instanceCost = queryWorkloadRefreshMessage.getInstanceCost(); + } + + @Override + public HelixTaskResult handleMessage() { + // TODO: Add logic to invoke the query workload manager to refresh/delete the query workload config + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) { + LOGGER.error("Got error while refreshing query workload config for query workload: {} (error code: {}," + + " error type: {})", _queryWorkloadName, errorCode, errorType, e); + } + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java new file mode 100644 index 0000000000..85f4da123e --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.helix.model.Message; +import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; + + +/** + * Message to refresh the query workload on the instances. + * This message include the host level cost for each instance. + */ +public class QueryWorkloadRefreshMessage extends Message { + public static final String REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE = "REFRESH_QUERY_WORKLOAD"; + public static final String DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE = "DELETE_QUERY_WORKLOAD"; + public static final String QUERY_WORKLOAD_NAME = "queryWorkloadName"; + public static final String INSTANCE_COST = "instanceCost"; + + /** + * Constructor for the sender. + */ + public QueryWorkloadRefreshMessage(String queryWorkloadName, String messageSubType, + @Nullable InstanceCost instanceCost) { + super(Message.MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setMsgSubType(messageSubType); + // Give it infinite time to process the message, as long as session is alive + setExecutionTimeout(-1); + QueryWorkloadConfigUtils.updateZNRecordWithInstanceCost(getRecord(), queryWorkloadName, instanceCost); + } + + /** + * Constructor for the receiver. + */ + public QueryWorkloadRefreshMessage(Message message) { + super(message.getRecord()); + if (!message.getMsgSubType().equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE) + || !message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) { + throw new IllegalArgumentException("Unknown message subtype:" + message.getMsgSubType()); + } + } + + public String getQueryWorkloadName() { + return getRecord().getSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME); + } + + public InstanceCost getInstanceCost() { + return QueryWorkloadConfigUtils.getInstanceCostFromZNRecord(getRecord()); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index dfb2bbe401..cfb4ad4546 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -43,12 +43,14 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.LogicalTableConfigUtils; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; +import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.spi.config.ConfigUtils; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.user.UserConfig; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -82,6 +84,7 @@ public class ZKMetadataProvider { private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER"; private static final String PROPERTYSTORE_SEGMENT_LINEAGE = "/SEGMENT_LINEAGE"; private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = "/MINION_TASK_METADATA"; + private static final String PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX = "/CONFIGS/QUERYWORKLOAD"; public static void setUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String username, ZNRecord znRecord) { propertyStore.set(constructPropertyStorePathForUserConfig(username), znRecord, AccessOption.PERSISTENT); @@ -305,6 +308,14 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, tableNameWithType); } + public static String getPropertyStoreWorkloadConfigsPrefix() { + return PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX; + } + + public static String constructPropertyStorePathForQueryWorkloadConfig(String workloadName) { + return StringUtil.join("/", PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX, workloadName); + } + @Deprecated public static String constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType, String tableNameWithType) { @@ -837,6 +848,52 @@ public class ZKMetadataProvider { } } + public static List<QueryWorkloadConfig> getAllQueryWorkloadConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) { + List<ZNRecord> znRecords = + propertyStore.getChildren(getPropertyStoreWorkloadConfigsPrefix(), null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); + if (znRecords == null) { + return Collections.emptyList(); + } + int numZNRecords = znRecords.size(); + List<QueryWorkloadConfig> queryWorkloadConfigs = new ArrayList<>(numZNRecords); + for (ZNRecord znRecord : znRecords) { + queryWorkloadConfigs.add(QueryWorkloadConfigUtils.fromZNRecord(znRecord)); + } + return queryWorkloadConfigs; + } + + @Nullable + public static QueryWorkloadConfig getQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, + String workloadName) { + ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForQueryWorkloadConfig(workloadName), + null, AccessOption.PERSISTENT); + if (znRecord == null) { + return null; + } + return QueryWorkloadConfigUtils.fromZNRecord(znRecord); + } + + public static boolean setQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, + QueryWorkloadConfig queryWorkloadConfig) { + String path = constructPropertyStorePathForQueryWorkloadConfig(queryWorkloadConfig.getQueryWorkloadName()); + boolean isNewConfig = !propertyStore.exists(path, AccessOption.PERSISTENT); + ZNRecord znRecord = isNewConfig ? new ZNRecord(queryWorkloadConfig.getQueryWorkloadName()) + : propertyStore.get(path, null, AccessOption.PERSISTENT); + // Update the record with new workload configuration + QueryWorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord, queryWorkloadConfig); + // Create or update based on existence + return isNewConfig ? propertyStore.create(path, znRecord, AccessOption.PERSISTENT) + : propertyStore.set(path, znRecord, AccessOption.PERSISTENT); + } + + public static void deleteQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String workloadName) { + String propertyStorePath = constructPropertyStorePathForQueryWorkloadConfig(workloadName); + if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { + propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT); + } + } + public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, LogicalTableConfig logicalTableConfig) { try { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java new file mode 100644 index 0000000000..034e4724f0 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.config; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.common.utils.http.HttpClientConfig; +import org.apache.pinot.common.utils.tls.TlsUtils; +import org.apache.pinot.spi.config.workload.EnforcementProfile; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.apache.pinot.spi.config.workload.NodeConfig; +import org.apache.pinot.spi.config.workload.PropagationScheme; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; + + +public class QueryWorkloadConfigUtils { + private QueryWorkloadConfigUtils() { + } + + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(QueryWorkloadConfigUtils.class); + private static final HttpClient HTTP_CLIENT = new HttpClient(HttpClientConfig.DEFAULT_HTTP_CLIENT_CONFIG, + TlsUtils.getSslContext()); + + /** + * Converts a ZNRecord into a QueryWorkloadConfig object by extracting mapFields. + * + * @param znRecord The ZNRecord containing workload config data. + * @return A QueryWorkloadConfig object. + */ + public static QueryWorkloadConfig fromZNRecord(ZNRecord znRecord) { + Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null"); + String queryWorkloadName = znRecord.getSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME); + Preconditions.checkNotNull(queryWorkloadName, "queryWorkloadName cannot be null"); + String nodeConfigsJson = znRecord.getSimpleField(QueryWorkloadConfig.NODE_CONFIGS); + Preconditions.checkNotNull(nodeConfigsJson, "nodeConfigs cannot be null"); + try { + List<NodeConfig> nodeConfigs = JsonUtils.stringToObject(nodeConfigsJson, new TypeReference<>() { }); + return new QueryWorkloadConfig(queryWorkloadName, nodeConfigs); + } catch (Exception e) { + String errorMessage = String.format("Failed to convert ZNRecord : %s to QueryWorkloadConfig", znRecord); + throw new RuntimeException(errorMessage, e); + } + } + + /** + * Updates a ZNRecord with the fields from a WorkloadConfig object. + * + * @param queryWorkloadConfig The QueryWorkloadConfig object to convert. + * @param znRecord The ZNRecord to update. + */ + public static void updateZNRecordWithWorkloadConfig(ZNRecord znRecord, QueryWorkloadConfig queryWorkloadConfig) { + znRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, queryWorkloadConfig.getQueryWorkloadName()); + try { + znRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, + JsonUtils.objectToString(queryWorkloadConfig.getNodeConfigs())); + } catch (Exception e) { + String errorMessage = String.format("Failed to convert QueryWorkloadConfig : %s to ZNRecord", + queryWorkloadConfig); + throw new RuntimeException(errorMessage, e); + } + } + + public static void updateZNRecordWithInstanceCost(ZNRecord znRecord, String queryWorkloadName, + InstanceCost instanceCost) { + Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null"); + Preconditions.checkNotNull(instanceCost, "InstanceCost cannot be null"); + try { + znRecord.setSimpleField(QueryWorkloadRefreshMessage.QUERY_WORKLOAD_NAME, queryWorkloadName); + znRecord.setSimpleField(QueryWorkloadRefreshMessage.INSTANCE_COST, JsonUtils.objectToString(instanceCost)); + } catch (Exception e) { + String errorMessage = String.format("Failed to convert InstanceCost : %s to ZNRecord", + instanceCost); + throw new RuntimeException(errorMessage, e); + } + } + + public static InstanceCost getInstanceCostFromZNRecord(ZNRecord znRecord) { + Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null"); + String instanceCostJson = znRecord.getSimpleField(QueryWorkloadRefreshMessage.INSTANCE_COST); + Preconditions.checkNotNull(instanceCostJson, "InstanceCost cannot be null"); + try { + return JsonUtils.stringToObject(instanceCostJson, InstanceCost.class); + } catch (Exception e) { + String errorMessage = String.format("Failed to convert ZNRecord : %s to InstanceCost", znRecord); + throw new RuntimeException(errorMessage, e); + } + } + /** + * Fetches query workload configs for a specific instance from the controller. + * + * @param controllerUrl The URL of the controller. + * @param instanceId The ID of the instance to fetch configs for. + * @param nodeType The type of node (e.g., BROKER, SERVER). + * @return A map of workload names to their corresponding InstanceCost objects. + */ + public static Map<String, InstanceCost> getQueryWorkloadConfigsFromController(String controllerUrl, String instanceId, + NodeConfig.Type nodeType) { + try { + if (controllerUrl == null || controllerUrl.isEmpty()) { + LOGGER.warn("Controller URL is empty, cannot fetch query workload configs for instance: {}", instanceId); + return Collections.emptyMap(); + } + URI queryWorkloadURI = new URI(controllerUrl + "/queryWorkloadConfigs/instance/" + instanceId + "?nodeType=" + + nodeType); + ClassicHttpRequest request = ClassicRequestBuilder.get(queryWorkloadURI) + .setVersion(HttpVersion.HTTP_1_1) + .setHeader(HttpHeaders.CONTENT_TYPE, HttpClient.JSON_CONTENT_TYPE) + .build(); + AtomicReference<Map<String, InstanceCost>> workloadToInstanceCost = new AtomicReference<>(null); + RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(3, 3000L, 1.2f); + retryPolicy.attempt(() -> { + try { + SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException( + HTTP_CLIENT.sendRequest(request, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS) + ); + if (response.getStatusCode() == HttpStatus.SC_OK) { + workloadToInstanceCost.set(JsonUtils.stringToObject(response.getResponse(), new TypeReference<>() { })); + LOGGER.info("Successfully fetched query workload configs from controller: {}, Instance: {}", + controllerUrl, instanceId); + return true; + } else if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) { + LOGGER.info("No query workload configs found for controller: {}, Instance: {}", controllerUrl, instanceId); + workloadToInstanceCost.set(Collections.emptyMap()); + return true; + } else { + LOGGER.warn("Failed to fetch query workload configs from controller: {}, Instance: {}, Status: {}", + controllerUrl, instanceId, response.getStatusCode()); + return false; + } + } catch (Exception e) { + LOGGER.warn("Failed to fetch query workload configs from controller: {}, Instance: {}", + controllerUrl, instanceId, e); + return false; + } + }); + return workloadToInstanceCost.get(); + } catch (Exception e) { + LOGGER.warn("Failed to fetch query workload configs from controller: {}, Instance: {}", + controllerUrl, instanceId, e); + return Collections.emptyMap(); + } + } + + /** + * Validates the given QueryWorkloadConfig and returns a list of validation error messages. + * + * @param config the QueryWorkloadConfig to validate + * @return a list of validation errors; empty if config is valid + */ + public static List<String> validateQueryWorkloadConfig(QueryWorkloadConfig config) { + List<String> errors = new ArrayList<>(); + if (config == null) { + errors.add("QueryWorkloadConfig cannot be null"); + return errors; + } + String name = config.getQueryWorkloadName(); + if (name == null || name.trim().isEmpty()) { + errors.add("queryWorkloadName cannot be null or empty"); + } + List<NodeConfig> nodeConfigs = config.getNodeConfigs(); + if (nodeConfigs == null || nodeConfigs.isEmpty()) { + errors.add("nodeConfigs cannot be null or empty"); + } else { + for (int i = 0; i < nodeConfigs.size(); i++) { + NodeConfig nodeConfig = nodeConfigs.get(i); + String prefix = "nodeConfigs[" + i + "]"; + if (nodeConfig == null) { + errors.add(prefix + " cannot be null"); + continue; + } + if (nodeConfig.getNodeType() == null) { + errors.add(prefix + ".type cannot be null"); + } + // Validate EnforcementProfile + EnforcementProfile enforcementProfile = nodeConfig.getEnforcementProfile(); + if (enforcementProfile == null) { + errors.add(prefix + "enforcementProfile cannot be null"); + } else { + if (enforcementProfile.getCpuCostNs() < 0) { + errors.add(prefix + ".enforcementProfile.cpuCostNs cannot be negative"); + } + if (enforcementProfile.getMemoryCostBytes() < 0) { + errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot be negative"); + } + } + // Validate PropagationScheme + PropagationScheme propagationScheme = nodeConfig.getPropagationScheme(); + if (propagationScheme == null) { + errors.add(prefix + ".propagationScheme cannot be null"); + } else { + if (propagationScheme.getPropagationType() == null) { + errors.add(prefix + ".propagationScheme.type cannot be null"); + } + } + } + } + return errors; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java new file mode 100644 index 0000000000..0bbe84b7fe --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.List; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.config.workload.EnforcementProfile; +import org.apache.pinot.spi.config.workload.NodeConfig; +import org.apache.pinot.spi.config.workload.PropagationScheme; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class QueryWorkloadConfigUtilsTest { + + @Test(dataProvider = "fromZNRecordDataProvider") + public void testFromZNRecord(ZNRecord znRecord, QueryWorkloadConfig expectedQueryWorkloadConfig, boolean shouldFail) { + try { + QueryWorkloadConfig actualQueryWorkloadConfig = QueryWorkloadConfigUtils.fromZNRecord(znRecord); + if (shouldFail) { + Assert.fail("Expected an exception but none was thrown"); + } + Assert.assertEquals(actualQueryWorkloadConfig, expectedQueryWorkloadConfig); + } catch (Exception e) { + if (!shouldFail) { + Assert.fail("Caught unexpected exception: " + e.getMessage(), e); + } + } + } + + @DataProvider(name = "fromZNRecordDataProvider") + public Object[][] fromZNRecordDataProvider() throws JsonProcessingException { + List<Object[]> data = new ArrayList<>(); + + // Shared, valid configuration + EnforcementProfile validEnforcementProfile = new EnforcementProfile(100, 100); + + // Server node + PropagationScheme serverPropagationScheme = new PropagationScheme(PropagationScheme.Type.TABLE, + List.of("value1", "value2")); + NodeConfig serverNodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile, + serverPropagationScheme); + + // Broker node + PropagationScheme brokerPropagationScheme = new PropagationScheme(PropagationScheme.Type.TENANT, + List.of("value3", "value4")); + NodeConfig brokerNodeConfig = new NodeConfig(NodeConfig.Type.BROKER_NODE, validEnforcementProfile, + brokerPropagationScheme); + + List<NodeConfig> nodeConfigs = List.of(serverNodeConfig, brokerNodeConfig); + QueryWorkloadConfig validQueryWorkloadConfig = new QueryWorkloadConfig("workloadId", nodeConfigs); + + // Valid scenario: NODE_CONFIGS field is a JSON array string + ZNRecord validZnRecord = new ZNRecord("workloadId"); + validZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + validZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, JsonUtils.objectToString(nodeConfigs)); + data.add(new Object[] { validZnRecord, validQueryWorkloadConfig, false }); + + // Null propagation scheme + NodeConfig nodeConfigWithoutPropagationScheme = new NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile, + null); + List<NodeConfig> nodeConfigsWithoutPropagation = List.of(nodeConfigWithoutPropagationScheme); + ZNRecord znRecordNullPropagation = new ZNRecord("workloadId"); + znRecordNullPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + znRecordNullPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, + JsonUtils.objectToString(nodeConfigsWithoutPropagation)); + QueryWorkloadConfig expectedQueryWorkloadConfigNullPropagation = new QueryWorkloadConfig("workloadId", + nodeConfigsWithoutPropagation); + data.add(new Object[] { znRecordNullPropagation, expectedQueryWorkloadConfigNullPropagation, false }); + + // Missing NODE_CONFIGS field + ZNRecord missingNodeConfigsZnRecord = new ZNRecord("workloadId"); + missingNodeConfigsZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + data.add(new Object[] { missingNodeConfigsZnRecord, null, true }); + + // Invalid JSON in NODE_CONFIGS field + ZNRecord invalidJsonZnRecord = new ZNRecord("workloadId"); + invalidJsonZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + invalidJsonZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, "{invalidJsonField: }"); + data.add(new Object[] { invalidJsonZnRecord, null, true }); + + return data.toArray(new Object[0][]); + } + + @Test(dataProvider = "updateZNRecordDataProvider") + public void testUpdateZNRecordWithWorkloadConfig(QueryWorkloadConfig queryWorkloadConfig, ZNRecord znRecord, + ZNRecord expectedZnRecord, boolean shouldFail) { + try { + QueryWorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord, queryWorkloadConfig); + if (shouldFail) { + Assert.fail("Expected an exception but none was thrown"); + } + Assert.assertEquals(znRecord, expectedZnRecord); + } catch (Exception e) { + if (!shouldFail) { + Assert.fail("Caught unexpected exception: " + e.getMessage(), e); + } + } + } + + @DataProvider(name = "updateZNRecordDataProvider") + public Object[][] updateZNRecordDataProvider() throws JsonProcessingException { + List<Object[]> data = new ArrayList<>(); + + EnforcementProfile validEnforcementProfile = new EnforcementProfile(100, 100); + // Server scheme + PropagationScheme serverPropagationScheme = new PropagationScheme(PropagationScheme.Type.TABLE, + List.of("value1", "value2")); + NodeConfig serverNodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile, + serverPropagationScheme); + // Broker scheme + PropagationScheme brokerPropagationScheme = new PropagationScheme(PropagationScheme.Type.TENANT, + List.of("value3", "value4")); + NodeConfig brokerNodeConfig = new NodeConfig(NodeConfig.Type.BROKER_NODE, validEnforcementProfile, + brokerPropagationScheme); + List<NodeConfig> nodeConfigs = List.of(serverNodeConfig, brokerNodeConfig); + QueryWorkloadConfig validQueryWorkloadConfig = new QueryWorkloadConfig("workloadId", nodeConfigs); + + // 1) Valid scenario + ZNRecord validZnRecord = new ZNRecord("validId"); + ZNRecord expectedValidZnRecord = new ZNRecord("validId"); + validZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + expectedValidZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + String nodeConfigsJson = JsonUtils.objectToString(nodeConfigs); + validZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, nodeConfigsJson); + expectedValidZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, nodeConfigsJson); + data.add(new Object[] { validQueryWorkloadConfig, validZnRecord, expectedValidZnRecord, false }); + + // 2) Null propagation scheme in both nodes + NodeConfig nodeConfigWithoutPropagation = new NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile, + null); + List<NodeConfig> nodeConfigsWithoutPropagation = List.of(nodeConfigWithoutPropagation); + QueryWorkloadConfig configWithoutPropagation = new QueryWorkloadConfig("noPropagation", + nodeConfigsWithoutPropagation); + + String nodeConfigsNoPropagationJson = JsonUtils.objectToString(nodeConfigsWithoutPropagation); + + ZNRecord znRecordNoPropagation = new ZNRecord("noPropagationId"); + ZNRecord expectedZnRecordNoPropagation = new ZNRecord("noPropagationId"); + znRecordNoPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "noPropagation"); + znRecordNoPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, nodeConfigsNoPropagationJson); + + expectedZnRecordNoPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "noPropagation"); + expectedZnRecordNoPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, nodeConfigsNoPropagationJson); + data.add(new Object[] { configWithoutPropagation, znRecordNoPropagation, expectedZnRecordNoPropagation, false }); + + // 3) Null server node in QueryWorkloadConfig + List<NodeConfig> nodeConfigsWithNullServerNode = List.of(brokerNodeConfig); + QueryWorkloadConfig nullServerNodeConfig = new QueryWorkloadConfig("nullServer", nodeConfigsWithNullServerNode); + ZNRecord znRecordNullServer = new ZNRecord("nullServerId"); + ZNRecord expectedZnRecordNullServer = new ZNRecord("nullServerId"); + znRecordNullServer.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "nullServer"); + expectedZnRecordNullServer.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "nullServer"); + String nodeConfigsWithNullServerJson = JsonUtils.objectToString(nodeConfigsWithNullServerNode); + znRecordNullServer.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, nodeConfigsWithNullServerJson); + expectedZnRecordNullServer.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, nodeConfigsWithNullServerJson); + data.add(new Object[] { nullServerNodeConfig, znRecordNullServer, expectedZnRecordNullServer, false }); + + // 4) Null QueryWorkloadConfig -> should fail + ZNRecord znRecordNullConfig = new ZNRecord("nullConfigId"); + data.add(new Object[] { null, znRecordNullConfig, null, true }); + + // 5) Null ZNRecord -> should fail + data.add(new Object[] { validQueryWorkloadConfig, null, null, true }); + + // 6) Behavior with empty ZNRecord ID + ZNRecord emptyIdZnRecord = new ZNRecord(""); + ZNRecord expectedEmptyIdZnRecord = new ZNRecord(""); + emptyIdZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + expectedEmptyIdZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME, "workloadId"); + String emptyNodeConfigsJson = JsonUtils.objectToString(nodeConfigs); + emptyIdZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, emptyNodeConfigsJson); + expectedEmptyIdZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS, emptyNodeConfigsJson); + data.add(new Object[] { validQueryWorkloadConfig, emptyIdZnRecord, expectedEmptyIdZnRecord, false }); + + return data.toArray(new Object[0][]); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java index fea05fc8b2..06305deb4c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java @@ -53,6 +53,7 @@ public class Constants { public static final String APP_CONFIGS = "AppConfigs"; public static final String PERIODIC_TASK_TAG = "PeriodicTask"; public static final String UPSERT_RESOURCE_TAG = "Upsert"; + public static final String QUERY_WORKLOAD_TAG = "QueryWorkload"; public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = "RealtimeSegmentValidationManager"; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java new file mode 100644 index 0000000000..35bdf6f9a2 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.auth.Actions; +import org.apache.pinot.core.auth.Authorize; +import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + +@Api(tags = Constants.QUERY_WORKLOAD_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { + @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = + SWAGGER_AUTHORIZATION_KEY, description = + "The format of the key is ```\"Basic <token>\" or \"Bearer " + + "<token>\"```"), @ApiKeyAuthDefinition(name = CommonConstants.QUERY_WORKLOAD, in = + ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = CommonConstants.QUERY_WORKLOAD, description = + "Workload context passed through http header. If no context is provided 'default' workload " + + "context will be considered.") +})) +@Path("/") +public class PinotQueryWorkloadRestletResource { + public static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryWorkloadRestletResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/queryWorkloadConfigs") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_QUERY_WORKLOAD_CONFIG) + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get all query workload configs", notes = "Get all workload configs") + public String getQueryWorkloadConfigs(@Context HttpHeaders httpHeaders) { + try { + LOGGER.info("Received request to get all queryWorkloadConfigs"); + List<QueryWorkloadConfig> queryWorkloadConfigs = _pinotHelixResourceManager.getAllQueryWorkloadConfigs(); + if (queryWorkloadConfigs.isEmpty()) { + return JsonUtils.objectToString(Map.of()); + } + String response = JsonUtils.objectToString(queryWorkloadConfigs); + LOGGER.info("Successfully fetched all queryWorkloadConfigs"); + return response; + } catch (Exception e) { + String errorMessage = String.format("Error while getting all workload configs, error: %s", e); + throw new ControllerApplicationException(LOGGER, errorMessage, Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * API to fetch query workload config + * @param queryWorkloadName Name of the query workload + * Example request: + * /queryWorkloadConfigs/workload-foo1 + * Example response: + * { + * "queryWorkloadName" : "workload-foo1", + * "nodeConfigs" : { + * { + * "nodeType" : "brokerNode", + * "enforcementProfile": { + * "cpuCostNs": 500, + * "memoryCostBytes": 1000 + * }, + * "propagationScheme": { + * "propagationType": "TABLE", + * "values": ["airlineStats"] + * } + * }, + * { + * "nodeType" : "serverNode", + * "enforcementProfile": { + * "cpuCostNs": 1500, + * "memoryCostBytes": 12000 + * }, + * "propagationScheme": { + * "propagationType": "TENANT", + * "values": ["DefaultTenant"] + * } + * } + * } + * } + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/queryWorkloadConfigs/{queryWorkloadName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_QUERY_WORKLOAD_CONFIG) + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get query workload config", notes = "Get workload configs for the workload name") + public String getQueryWorkloadConfig(@PathParam("queryWorkloadName") String queryWorkloadName, + @Context HttpHeaders httpHeaders) { + try { + LOGGER.info("Received request to get workload config for workload: {}", queryWorkloadName); + QueryWorkloadConfig queryWorkloadConfig = _pinotHelixResourceManager.getQueryWorkloadConfig(queryWorkloadName); + if (queryWorkloadConfig == null) { + throw new ControllerApplicationException(LOGGER, "Workload config not found for workload: " + queryWorkloadName, + Response.Status.NOT_FOUND, null); + } + String response = queryWorkloadConfig.toJsonString(); + LOGGER.info("Successfully fetched workload config for workload: {}", queryWorkloadName); + return response; + } catch (Exception e) { + if (e instanceof ControllerApplicationException) { + throw (ControllerApplicationException) e; + } else { + String errorMessage = String.format("Error while getting workload config for workload: %s, error: %s", + queryWorkloadName, e); + throw new ControllerApplicationException(LOGGER, errorMessage, Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + + /** + * API to get all workload configs associated with the instance + * @param instanceName Helix instance name + * @return Map of workload name to instance cost + * Example request: + * /queryWorkloadConfigs/instance/Server_localhost_1234 + * Example response: + * { + * "workload1": { + * "cpuCostNs": 100, + * "memoryCostBytes":100 + * }, + * "workload2": { + * "cpuCostNs": 50, + * "memoryCostBytes": 50 + * } + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/queryWorkloadConfigs/instance/{instanceName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_INSTANCE_QUERY_WORKLOAD_CONFIG) + @Authenticate(AccessType.READ) + @ApiOperation(value = "Get all workload configs associated with the instance", + notes = "Get all workload configs associated with the instance") + public String getQueryWorkloadConfigForInstance(@PathParam("instanceName") String instanceName, + @Context HttpHeaders httpHeaders) { + try { + Map<String, InstanceCost> workloadToInstanceCostMap = _pinotHelixResourceManager.getQueryWorkloadManager() + .getWorkloadToInstanceCostFor(instanceName); + if (workloadToInstanceCostMap == null || workloadToInstanceCostMap.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "No workload configs found for instance: " + instanceName, + Response.Status.NOT_FOUND, null); + } + return JsonUtils.objectToString(workloadToInstanceCostMap); + } catch (Exception e) { + if (e instanceof ControllerApplicationException) { + throw (ControllerApplicationException) e; + } else { + String errorMessage = String.format("Error while getting workload config for instance: %s, error: %s", + instanceName, e); + throw new ControllerApplicationException(LOGGER, errorMessage, Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + /** + * Updates the query workload config + * @param requestString JSON string representing the QueryWorkloadConfig + * Example request: + * { + * "queryWorkloadName" : "workload-foo1", + * "nodeConfigs" : { + * { + * "nodeType" : "brokerNode", + * "enforcementProfile": { + * "cpuCostNs": 500, + * "memoryCostBytes": 1000 + * }, + * "propagationScheme": { + * "propagationType": "TABLE", + * "values": ["airlineStats"] + * } + * }, + * { + * "nodeType" : "serverNode", + * "enforcementProfile": { + * "cpuCostNs": 1500, + * "memoryCostBytes": 12000 + * }, + * "propagationScheme": { + * "propagationType": "TENANT", + * "values": ["DefaultTenant"] + * } + * } + * } + * } + * + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/queryWorkloadConfigs") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_QUERY_WORKLOAD_CONFIG) + @Authenticate(AccessType.UPDATE) + @ApiOperation(value = "Update query workload config", notes = "Update workload config for the workload name") + public Response updateQueryWorkloadConfig(String requestString, @Context HttpHeaders httpHeaders) { + try { + LOGGER.info("Received request to update queryWorkloadConfig with request: {}", requestString); + QueryWorkloadConfig queryWorkloadConfig = JsonUtils.stringToObject(requestString, QueryWorkloadConfig.class); + List<String> validationErrors = QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig); + if (!validationErrors.isEmpty()) { + String errorMessage = String.format("Invalid query workload config: %s", validationErrors); + throw new ControllerApplicationException(LOGGER, errorMessage, Response.Status.BAD_REQUEST, null); + } + _pinotHelixResourceManager.setQueryWorkloadConfig(queryWorkloadConfig); + String successMessage = String.format("Query Workload config updated successfully for workload: %s", + queryWorkloadConfig.getQueryWorkloadName()); + LOGGER.info(successMessage); + return Response.ok().entity(successMessage).build(); + } catch (Exception e) { + String errorMessage = String.format("Error when updating query workload request: %s, error: %s", + requestString, e); + throw new ControllerApplicationException(LOGGER, errorMessage, Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Deletes the query workload config + * @param queryWorkloadName Name of the query workload to be deleted + * Example request: + * /queryWorkloadConfigs/workload-foo1 + */ + @DELETE + @Produces(MediaType.APPLICATION_JSON) + @Path("/queryWorkloadConfigs/{queryWorkloadName}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DELETE_QUERY_WORKLOAD_CONFIG) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Delete query workload config", notes = "Delete workload config for the workload name") + public Response deleteQueryWorkloadConfig(@PathParam("queryWorkloadName") String queryWorkloadName, + @Context HttpHeaders httpHeaders) { + try { + _pinotHelixResourceManager.deleteQueryWorkloadConfig(queryWorkloadName); + String successMessage = String.format("Query Workload config deleted successfully for workload: %s", + queryWorkloadName); + LOGGER.info(successMessage); + return Response.ok().entity(successMessage).build(); + } catch (Exception e) { + String errorMessage = String.format("Error when deleting query workload for workload: %s, error: %s", + queryWorkloadName, e); + throw new ControllerApplicationException(LOGGER, errorMessage, Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * API to refresh propagation for a single query workload config + * This API doesn't update the config, it only triggers the propagation of an existing workload config + * + * @param queryWorkloadName Name of the query workload to refresh + * Example request: + * POST /queryWorkloadConfigs/{queryWorkloadName}/refresh + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/queryWorkloadConfigs/{queryWorkloadName}/refresh") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_QUERY_WORKLOAD_CONFIG) + @Authenticate(AccessType.UPDATE) + @ApiOperation(value = "Refresh query workload config propagation", notes = "Force propagation of an existing config") + public Response refreshQueryWorkloadConfig(@PathParam("queryWorkloadName") String queryWorkloadName, + @Context HttpHeaders httpHeaders) { + try { + LOGGER.info("Received request to refresh workload config propagation for workload: {}", queryWorkloadName); + // Fetch existing config + QueryWorkloadConfig existingConfig = _pinotHelixResourceManager.getQueryWorkloadConfig(queryWorkloadName); + if (existingConfig == null) { + throw new ControllerApplicationException(LOGGER, "Workload config not found for workload: " + queryWorkloadName, + Response.Status.NOT_FOUND, null); + } + _pinotHelixResourceManager.getQueryWorkloadManager().propagateWorkloadUpdateMessage(existingConfig); + String successMessage = String.format("Query workload config propagation triggered for workload: %s", + queryWorkloadName); + LOGGER.info(successMessage); + return Response.ok().entity(successMessage).build(); + } catch (Exception e) { + if (e instanceof ControllerApplicationException) { + throw (ControllerApplicationException) e; + } else { + String errorMessage = String.format("Error when refreshing query workload config for workload: %s, error: %s", + queryWorkloadName, e); + throw new ControllerApplicationException(LOGGER, errorMessage, + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index a07caac5f1..5589bce4be 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -105,6 +105,7 @@ import org.apache.pinot.common.lineage.SegmentLineageUtils; import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage; import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage; +import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.RunPeriodicTaskMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; @@ -155,6 +156,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; +import org.apache.pinot.controller.workload.QueryWorkloadManager; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.instance.Instance; @@ -170,6 +172,7 @@ import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.user.ComponentType; import org.apache.pinot.spi.config.user.RoleType; import org.apache.pinot.spi.config.user.UserConfig; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; @@ -237,6 +240,7 @@ public class PinotHelixResourceManager { private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; private final LineageManager _lineageManager; + private final QueryWorkloadManager _queryWorkloadManager; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, @@ -263,6 +267,7 @@ public class PinotHelixResourceManager { _lineageUpdaterLocks[i] = new Object(); } _lineageManager = lineageManager; + _queryWorkloadManager = new QueryWorkloadManager(this); } public PinotHelixResourceManager(ControllerConf controllerConf) { @@ -544,6 +549,11 @@ public class PinotHelixResourceManager { .filter(instance -> InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList()); } + public List<String> getAllServerInstances() { + return HelixHelper.getAllInstances(_helixAdmin, _helixClusterName).stream().filter(InstanceTypeUtils::isServer) + .collect(Collectors.toList()); + } + /** * Get all instances with the given tag */ @@ -1831,7 +1841,7 @@ public class PinotHelixResourceManager { .put(tableNameWithType, SegmentAssignmentUtils.getInstanceStateMap(brokers, BrokerResourceStateModel.ONLINE)); return is; }); - + _queryWorkloadManager.propagateWorkloadFor(tableNameWithType); LOGGER.info("Adding table {}: Successfully added table", tableNameWithType); } @@ -2195,6 +2205,8 @@ public class PinotHelixResourceManager { // Send update query quota message if quota is specified sendTableConfigRefreshMessage(tableNameWithType); + // TODO: Propagate workload for tables if there is change is change instance characteristics + _queryWorkloadManager.propagateWorkloadFor(tableNameWithType); } public void deleteUser(String username) { @@ -4788,6 +4800,53 @@ public class PinotHelixResourceManager { return tagMinInstanceMap; } + public List<QueryWorkloadConfig> getAllQueryWorkloadConfigs() { + return ZKMetadataProvider.getAllQueryWorkloadConfigs(_propertyStore); + } + + @Nullable + public QueryWorkloadConfig getQueryWorkloadConfig(String queryWorkloadName) { + return ZKMetadataProvider.getQueryWorkloadConfig(_propertyStore, queryWorkloadName); + } + + public void setQueryWorkloadConfig(QueryWorkloadConfig queryWorkloadConfig) { + if (!ZKMetadataProvider.setQueryWorkloadConfig(_propertyStore, queryWorkloadConfig)) { + throw new RuntimeException("Failed to set workload config for queryWorkloadName: " + + queryWorkloadConfig.getQueryWorkloadName()); + } + _queryWorkloadManager.propagateWorkloadUpdateMessage(queryWorkloadConfig); + } + + public void sendQueryWorkloadRefreshMessage(Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) { + instanceToRefreshMessageMap.forEach((instance, message) -> { + Criteria criteria = new Criteria(); + criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + criteria.setInstanceName(instance); + criteria.setSessionSpecific(true); + + int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, message, null, -1); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} query workload config refresh messages to instance: {}", numMessagesSent, instance); + } else { + LOGGER.warn("No query workload config refresh message sent to instance: {}", instance); + } + }); + } + + public void deleteQueryWorkloadConfig(String workload) { + QueryWorkloadConfig queryWorkloadConfig = getQueryWorkloadConfig(workload); + if (queryWorkloadConfig == null) { + LOGGER.warn("Query workload config for {} does not exist, skipping deletion", workload); + return; + } + _queryWorkloadManager.propagateDeleteWorkloadMessage(queryWorkloadConfig); + ZKMetadataProvider.deleteQueryWorkloadConfig(_propertyStore, workload); + } + + public QueryWorkloadManager getQueryWorkloadManager() { + return _queryWorkloadManager; + } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java new file mode 100644 index 0000000000..c093b98755 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.workload.scheme.PropagationScheme; +import org.apache.pinot.controller.workload.scheme.PropagationSchemeProvider; +import org.apache.pinot.controller.workload.scheme.PropagationUtils; +import org.apache.pinot.controller.workload.splitter.CostSplitter; +import org.apache.pinot.controller.workload.splitter.DefaultCostSplitter; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.apache.pinot.spi.config.workload.NodeConfig; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The QueryWorkloadManager is responsible for managing the query workload configuration and propagating/computing + * the cost to be enforced by relevant instances based on the propagation scheme. + */ +public class QueryWorkloadManager { + public static final Logger LOGGER = LoggerFactory.getLogger(QueryWorkloadManager.class); + + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final PropagationSchemeProvider _propagationSchemeProvider; + private final CostSplitter _costSplitter; + + public QueryWorkloadManager(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _propagationSchemeProvider = new PropagationSchemeProvider(pinotHelixResourceManager); + // TODO: To make this configurable once we have multiple cost splitters implementations + _costSplitter = new DefaultCostSplitter(); + } + + /** + * Propagate the workload to the relevant instances based on the PropagationScheme + * @param queryWorkloadConfig The query workload configuration to propagate + * 1. Resolve the instances based on the node type and propagation scheme + * 2. Calculate the instance cost for each instance + * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances + */ + public void propagateWorkloadUpdateMessage(QueryWorkloadConfig queryWorkloadConfig) { + String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName(); + for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) { + // Resolve the instances based on the node type and propagation scheme + Set<String> instances = resolveInstances(nodeConfig); + if (instances.isEmpty()) { + String errorMsg = String.format("No instances found for Workload: %s", queryWorkloadName); + LOGGER.warn(errorMsg); + continue; + } + Map<String, InstanceCost> instanceCostMap = _costSplitter.computeInstanceCostMap(nodeConfig, instances); + Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = instanceCostMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> new QueryWorkloadRefreshMessage(queryWorkloadName, + QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, entry.getValue()))); + // Send the QueryWorkloadRefreshMessage to the instances + _pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap); + } + } + + /** + * Propagate delete workload refresh message for the given queryWorkloadConfig + * @param queryWorkloadConfig The query workload configuration to delete + * 1. Resolve the instances based on the node type and propagation scheme + * 2. Send the {@link QueryWorkloadRefreshMessage} with DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances + */ + public void propagateDeleteWorkloadMessage(QueryWorkloadConfig queryWorkloadConfig) { + String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName(); + for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) { + Set<String> instances = resolveInstances(nodeConfig); + if (instances.isEmpty()) { + String errorMsg = String.format("No instances found for Workload: %s", queryWorkloadName); + LOGGER.warn(errorMsg); + continue; + } + Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = instances.stream() + .collect(Collectors.toMap(instance -> instance, instance -> new QueryWorkloadRefreshMessage(queryWorkloadName, + QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, null))); + _pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap); + } + } + + /** + * Propagate the workload for the given table name, it does fast exits if queryWorkloadConfigs is empty + * @param tableName The table name to propagate the workload for, it can be a rawTableName or a tableNameWithType + * if rawTableName is provided, it will resolve all available tableTypes and propagate the workload for each tableType + * + * This method performs the following steps: + * 1. Find all the helix tags associated with the table + * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags + * 3. Propagate the workload cost for instances associated with the workloads + */ + public void propagateWorkloadFor(String tableName) { + try { + List<QueryWorkloadConfig> queryWorkloadConfigs = _pinotHelixResourceManager.getAllQueryWorkloadConfigs(); + if (queryWorkloadConfigs.isEmpty()) { + return; + } + // Get the helixTags associated with the table + List<String> helixTags = PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName); + // Find all workloads associated with the helix tags + Set<QueryWorkloadConfig> queryWorkloadConfigsForTags = + PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, helixTags, queryWorkloadConfigs); + // Propagate the workload for each QueryWorkloadConfig + for (QueryWorkloadConfig queryWorkloadConfig : queryWorkloadConfigsForTags) { + propagateWorkloadUpdateMessage(queryWorkloadConfig); + } + } catch (Exception e) { + String errorMsg = String.format("Failed to propagate workload for table: %s", tableName); + LOGGER.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + } + + /** + * Get all the workload costs associated with the given instance and node type + * 1. Find all the helix tags associated with the instance + * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags + * 3. Find the instance associated with the {@link QueryWorkloadConfig} and node type + * + * @param instanceName The instance name to get the workload costs for + * @return A map of workload name to {@link InstanceCost} for the given instance and node type + */ + public Map<String, InstanceCost> getWorkloadToInstanceCostFor(String instanceName) { + try { + Map<String, InstanceCost> workloadToInstanceCostMap = new HashMap<>(); + List<QueryWorkloadConfig> queryWorkloadConfigs = _pinotHelixResourceManager.getAllQueryWorkloadConfigs(); + if (queryWorkloadConfigs.isEmpty()) { + LOGGER.warn("No query workload configs found in zookeeper"); + return workloadToInstanceCostMap; + } + // Find all the helix tags associated with the instance + InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceName); + if (instanceConfig == null) { + LOGGER.warn("Instance config not found for instance: {}", instanceName); + return workloadToInstanceCostMap; + } + NodeConfig.Type nodeType; + if (InstanceTypeUtils.isServer(instanceName)) { + nodeType = NodeConfig.Type.SERVER_NODE; + } else if (InstanceTypeUtils.isBroker(instanceName)) { + nodeType = NodeConfig.Type.BROKER_NODE; + } else { + LOGGER.warn("Unsupported instance type: {}, cannot compute workload costs", instanceName); + return workloadToInstanceCostMap; + } + + // Find all workloads associated with the helix tags + Set<QueryWorkloadConfig> queryWorkloadConfigsForTags = + PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, instanceConfig.getTags(), + queryWorkloadConfigs); + // Calculate the instance cost from each workload + for (QueryWorkloadConfig queryWorkloadConfig : queryWorkloadConfigsForTags) { + for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) { + if (nodeConfig.getNodeType() == nodeType) { + Set<String> instances = resolveInstances(nodeConfig); + InstanceCost instanceCost = _costSplitter.computeInstanceCost(nodeConfig, instances, instanceName); + if (instanceCost != null) { + workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), instanceCost); + } + break; + } + } + } + return workloadToInstanceCostMap; + } catch (Exception e) { + String errorMsg = String.format("Failed to get workload to instance cost map for instance: %s", instanceName); + LOGGER.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + } + + private Set<String> resolveInstances(NodeConfig nodeConfig) { + PropagationScheme propagationScheme = + _propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType()); + return propagationScheme.resolveInstances(nodeConfig); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java new file mode 100644 index 0000000000..e9a373733b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.scheme; + +import java.util.HashSet; +import java.util.Set; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.workload.NodeConfig; + + +public class DefaultPropagationScheme implements PropagationScheme { + + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public DefaultPropagationScheme(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + } + + @Override + public Set<String> resolveInstances(NodeConfig nodeConfig) { + Set<String> instances; + NodeConfig.Type nodeType = nodeConfig.getNodeType(); + switch (nodeType) { + case BROKER_NODE: + instances = new HashSet<>(_pinotHelixResourceManager.getAllBrokerInstances()); + break; + case SERVER_NODE: + instances = new HashSet<>(_pinotHelixResourceManager.getAllServerInstances()); + break; + default: + throw new IllegalArgumentException("Invalid node type: " + nodeType); + } + return instances; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java new file mode 100644 index 0000000000..d73602d6ec --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.scheme; + +import java.util.Set; +import org.apache.pinot.spi.config.workload.NodeConfig; + +/** + * PropagationScheme is used to resolve instances based on the {@link NodeConfig} + * 1. It helps to identify which instances to propagate the workload to based on the node configuration + * 2. It helps among which instances the {@link org.apache.pinot.spi.config.workload.EnforcementProfile} should be split + */ +public interface PropagationScheme { + /** + * Resolve the instances based on the node type and node configuration + * @param nodeConfig The {@link NodeConfig} to resolve the instances + * @return The set of instances to propagate the workload + */ + Set<String> resolveInstances(NodeConfig nodeConfig); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java new file mode 100644 index 0000000000..d3e2eea167 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.scheme; + +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; + +public class PropagationSchemeProvider { + + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public PropagationSchemeProvider(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + } + + public PropagationScheme getPropagationScheme( + org.apache.pinot.spi.config.workload.PropagationScheme.Type schemeType) { + switch (schemeType) { + case TABLE: + return new TablePropagationScheme(_pinotHelixResourceManager); + case TENANT: + return new TenantPropagationScheme(_pinotHelixResourceManager); + default: + return new DefaultPropagationScheme(_pinotHelixResourceManager); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java new file mode 100644 index 0000000000..fa720e9e73 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.scheme; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TenantConfig; +import org.apache.pinot.spi.config.workload.NodeConfig; +import org.apache.pinot.spi.config.workload.PropagationScheme; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * This class provides utility methods for workload propagation. + */ +public class PropagationUtils { + + private PropagationUtils() { + } + + /** + * Get the mapping tableNameWithType → {BROKER_NODE→brokerTag, SERVER_NODE→(serverTag + overrides)} + * 1. Get all table configs from the PinotHelixResourceManager + * 2. For each table config, extract the tenant config + * 3. For each tenant config, get the broker and server tags + * 4. Populate the helix tags for BROKER_NODE and SERVER_NODE separately + */ + public static Map<String, Map<NodeConfig.Type, Set<String>>> getTableToHelixTags( + PinotHelixResourceManager pinotResourceManager) { + Map<String, Map<NodeConfig.Type, Set<String>>> tableToTags = new HashMap<>(); + for (TableConfig tableConfig : pinotResourceManager.getAllTableConfigs()) { + TenantConfig tenantConfig = tableConfig.getTenantConfig(); + TableType tableType = tableConfig.getTableType(); + + // Gather all relevant tags for this tenant + List<String> tenantTags = new ArrayList<>(); + collectHelixTagsForTable(tenantTags, tenantConfig, tableType); + + // Populate the helix tags for BROKER_NODE and SERVER_NODE separately to provide flexibility + // in workload propagation to direct the workload to only specific node types + String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker()); + Set<String> brokerTags = Collections.singleton(brokerTag); + + Set<String> serverTags = new HashSet<>(tenantTags); + serverTags.remove(brokerTag); + + Map<NodeConfig.Type, Set<String>> nodeTypeToTags = new EnumMap<>(NodeConfig.Type.class); + nodeTypeToTags.put(NodeConfig.Type.BROKER_NODE, brokerTags); + nodeTypeToTags.put(NodeConfig.Type.SERVER_NODE, serverTags); + + tableToTags.put(tableConfig.getTableName(), nodeTypeToTags); + } + return tableToTags; + } + + private static void collectHelixTagsForTable(List<String> tags, TenantConfig tenantConfig, TableType tableType) { + tags.add(TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker())); + if (tableType == TableType.OFFLINE) { + tags.add(TagNameUtils.getOfflineTagForTenant(tenantConfig.getServer())); + } else { + // Returns the realtime tag if completed server tag is not set + String completedServerTag = TagNameUtils.extractCompletedServerTag(tenantConfig); + // Returns the realtime tag if consuming server tag is not set + String consumingServerTag = TagNameUtils.extractConsumingServerTag(tenantConfig); + if (completedServerTag.equals(consumingServerTag)) { + tags.add(completedServerTag); + } else { + tags.add(consumingServerTag); + tags.add(completedServerTag); + } + } + } + + /** + * Get the helix tags for a given table name. + * If the table name does not have a type suffix, it will return both offline and realtime tags. + */ + public static List<String> getHelixTagsForTable(PinotHelixResourceManager pinotResourceManager, String tableName) { + List<String> combinedTags = new ArrayList<>(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + List<String> tablesWithType = (tableType == null) + ? Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName), + TableNameBuilder.REALTIME.tableNameWithType(tableName)) + : Collections.singletonList(tableName); + for (String table : tablesWithType) { + TableConfig tableConfig = pinotResourceManager.getTableConfig(table); + if (tableConfig != null) { + collectHelixTagsForTable(combinedTags, tableConfig.getTenantConfig(), tableConfig.getTableType()); + } + } + return combinedTags; + } + + /** + * Get the mapping between helix tag -> instances + */ + public static Map<String, Set<String>> getHelixTagToInstances(PinotHelixResourceManager pinotResourceManager) { + Map<String, Set<String>> tagToInstances = new HashMap<>(); + for (InstanceConfig instanceConfig : pinotResourceManager.getAllHelixInstanceConfigs()) { + String instanceName = instanceConfig.getInstanceName(); + for (String helixTag : instanceConfig.getTags()) { + tagToInstances.computeIfAbsent(helixTag, tag -> new HashSet<>()).add(instanceName); + } + } + return tagToInstances; + } + + /** + * Returns the set of {@link QueryWorkloadConfig}s that match any of the given Helix tags. + * + * This method filters the provided list of QueryWorkloadConfigs based on whether their propagation + * targets intersect with the specified `filterTags`. The matching is performed based on the + * propagation type defined for each node in the config: + * + * - For {@code TENANT} propagation: + * 1. Each value in the propagation scheme is treated as a tenant name or direct Helix tag. + * 2. If the value is a recognized Helix tag (broker/server), it is used directly. + * 3. Otherwise, the value is resolved to possible broker and server tags for the tenant. + * 4. If any resolved tag matches one of the `filterTags`, the config is included. + * + * - For {@code TABLE} propagation: + * 1. Table names are expanded to include type-suffixed forms (OFFLINE and/or REALTIME). + * 2. These table names are mapped to corresponding Helix tags using node type. + * 3. If any of the mapped tags intersect with `filterTags`, the config is included. + * + * @param pinotHelixResourceManager The resource manager used to look up table and instance metadata. + * @param filterTags The set of Helix tags used for filtering configs. + * @param queryWorkloadConfigs The full list of workload configs to evaluate. + * @return A set of workload configs whose propagation targets intersect with the filterTags. + */ + public static Set<QueryWorkloadConfig> getQueryWorkloadConfigsForTags( + PinotHelixResourceManager pinotHelixResourceManager, List<String> filterTags, + List<QueryWorkloadConfig> queryWorkloadConfigs) { + Set<QueryWorkloadConfig> matchedConfigs = new HashSet<>(); + Map<String, Map<NodeConfig.Type, Set<String>>> tableToHelixTags = getTableToHelixTags(pinotHelixResourceManager); + + for (QueryWorkloadConfig queryWorkloadConfig : queryWorkloadConfigs) { + for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) { + PropagationScheme scheme = nodeConfig.getPropagationScheme(); + + if (scheme.getPropagationType() == PropagationScheme.Type.TENANT) { + for (String tenant : scheme.getValues()) { + Set<String> resolvedTags = TagNameUtils.isOfflineServerTag(tenant) + || TagNameUtils.isRealtimeServerTag(tenant) || TagNameUtils.isBrokerTag(tenant) + ? Collections.singleton(tenant) + : new HashSet<>(getAllPossibleHelixTagsFor(tenant)); + if (!Collections.disjoint(resolvedTags, filterTags)) { + matchedConfigs.add(queryWorkloadConfig); + break; + } + } + } else if (scheme.getPropagationType() == PropagationScheme.Type.TABLE) { + for (String tableName : scheme.getValues()) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + List<String> tablesWithType = (tableType == null) + ? Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName), + TableNameBuilder.REALTIME.tableNameWithType(tableName)) + : Collections.singletonList(tableName); + for (String tableWithType : tablesWithType) { + Set<String> resolvedTags = tableToHelixTags + .getOrDefault(tableWithType, Collections.emptyMap()) + .getOrDefault(nodeConfig.getNodeType(), Collections.emptySet()); + if (!Collections.disjoint(resolvedTags, filterTags)) { + matchedConfigs.add(queryWorkloadConfig); + break; + } + } + } + } + } + } + return matchedConfigs; + } + + private static List<String> getAllPossibleHelixTagsFor(String tenantName) { + List<String> helixTags = new ArrayList<>(); + helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName)); + helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName)); + helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName)); + return helixTags; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java new file mode 100644 index 0000000000..c463eb87db --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.scheme; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.workload.NodeConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + +/** + * TablePropagationScheme is used to resolve instances based on the {@link NodeConfig} and {@link NodeConfig.Type} + * It resolves the instances based on the table names specified in the node configuration + */ +public class TablePropagationScheme implements PropagationScheme { + + private static PinotHelixResourceManager _pinotHelixResourceManager; + + public TablePropagationScheme(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + } + + @Override + public Set<String> resolveInstances(NodeConfig nodeConfig) { + Set<String> instances = new HashSet<>(); + List<String> tableNames = nodeConfig.getPropagationScheme().getValues(); + Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags + = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager); + Map<String, Set<String>> helixTagToInstances + = PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager); + for (String tableName : tableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + List<String> tablesWithType = new ArrayList<>(); + if (tableType == null) { + // Get both offline and realtime table names if type is not present. + tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName)); + tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName)); + } else { + tablesWithType.add(tableName); + } + for (String tableWithType : tablesWithType) { + Map<NodeConfig.Type, Set<String>> nodeToHelixTags = tableWithTypeToHelixTags.get(tableWithType); + if (nodeToHelixTags != null) { + Set<String> helixTags = nodeToHelixTags.get(nodeConfig.getNodeType()); + if (helixTags != null) { + for (String helixTag : helixTags) { + Set<String> helixInstances = helixTagToInstances.get(helixTag); + if (helixInstances != null) { + instances.addAll(helixInstances); + } + } + } + } + } + } + return instances; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java new file mode 100644 index 0000000000..4564b0c282 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.scheme; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.workload.NodeConfig; + + +/** + * TenantPropagationScheme is used to resolve instances based on the {@link NodeConfig} and {@link NodeConfig.Type} + * It resolves the instances based on the tenants specified in the node configuration + */ +public class TenantPropagationScheme implements PropagationScheme { + + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantPropagationScheme(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + } + + @Override + public Set<String> resolveInstances(NodeConfig nodeConfig) { + Map<String, Set<String>> helixTagToInstances + = PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager); + Set<String> allInstances = new HashSet<>(); + List<String> tenantNames = nodeConfig.getPropagationScheme().getValues(); + NodeConfig.Type nodeType = nodeConfig.getNodeType(); + // Get the unique set of helix tags for the tenants + Set<String> helixTags = new HashSet<>(); + for (String tenantName : tenantNames) { + if (nodeType == NodeConfig.Type.BROKER_NODE) { + helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName)); + } else if (nodeType == NodeConfig.Type.SERVER_NODE) { + if (TagNameUtils.isOfflineServerTag(tenantName) || TagNameUtils.isRealtimeServerTag(tenantName)) { + helixTags.add(tenantName); + } else { + helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName)); + helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName)); + } + } + } + // Get the instances for the helix tags + for (String helixTag : helixTags) { + Set<String> instances = helixTagToInstances.get(helixTag); + if (instances != null) { + allInstances.addAll(instances); + } + } + return allInstances; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java new file mode 100644 index 0000000000..9f7f070fb1 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.splitter; + +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.apache.pinot.spi.config.workload.NodeConfig; + +/** + * Interface for splitting the cost of a workload between instances. + */ +public interface CostSplitter { + /** + * Computes the cost for each instance in the given set of instances. + * + * @param nodeConfig the node configuration + * @param instances names of all instances involved + * @return a map from instance identifier to the cost for that instance + */ + Map<String, InstanceCost> computeInstanceCostMap(NodeConfig nodeConfig, Set<String> instances); + + /** + * Computes the cost for a specific instance. + * + * @param nodeConfig the node configuration + * @param instances names of all instances involved + * @param instance the instance identifier for which to compute the cost + * @return the cost for the specified instance + */ + InstanceCost computeInstanceCost(NodeConfig nodeConfig, Set<String> instances, String instance); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java new file mode 100644 index 0000000000..9e40015a0b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload.splitter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.config.workload.EnforcementProfile; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.apache.pinot.spi.config.workload.NodeConfig; + + +public class DefaultCostSplitter implements CostSplitter { + + @Override + public Map<String, InstanceCost> computeInstanceCostMap(NodeConfig nodeConfig, Set<String> instances) { + InstanceCost cost = computeInstanceCost(nodeConfig, instances, null); + Map<String, InstanceCost> costMap = new HashMap<>(); + for (String instance : instances) { + costMap.put(instance, cost); + } + return costMap; + } + + @Override + public InstanceCost computeInstanceCost(NodeConfig nodeConfig, Set<String> instances, String instance) { + long totalInstances = instances.size(); + EnforcementProfile enforcementProfile = nodeConfig.getEnforcementProfile(); + + long cpuCostNs = enforcementProfile.getCpuCostNs() / totalInstances; + long memoryCostBytes = enforcementProfile.getMemoryCostBytes() / totalInstances; + + return new InstanceCost(cpuCostNs, memoryCostBytes); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java new file mode 100644 index 0000000000..f487e68225 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.workload; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.workload.scheme.PropagationUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.workload.EnforcementProfile; +import org.apache.pinot.spi.config.workload.NodeConfig; +import org.apache.pinot.spi.config.workload.PropagationScheme; +import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class PropagationUtilsTest { + + private PinotHelixResourceManager _pinotHelixResourceManager; + + @BeforeClass + public void setUp() { + _pinotHelixResourceManager = Mockito.mock(PinotHelixResourceManager.class); + } + + @Test + public void getTableToHelixTagsTest() { + // Create a list of table configurations + List<TableConfig> tableConfigs = new ArrayList<>(); + tableConfigs.add(createTableConfig("table1", "serverTag1", "brokerTenant1", TableType.OFFLINE)); + tableConfigs.add(createTableConfig("table2", "serverTag2", "brokerTenant2", TableType.REALTIME)); + // Mock the behavior of getAllTableConfigs to return the list of table configurations + Mockito.when(_pinotHelixResourceManager.getAllTableConfigs()).thenReturn(tableConfigs); + // Call the method to get table to Helix tags + Map<String, Map<NodeConfig.Type, Set<String>>> tableToHelixTags + = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager); + // Verify the results + Map<String, Map<NodeConfig.Type, Set<String>>> expectedTags = new HashMap<>(); + expectedTags.put("table1_OFFLINE", new HashMap<>() {{ + put(NodeConfig.Type.SERVER_NODE, Set.of("serverTag1_OFFLINE")); + put(NodeConfig.Type.BROKER_NODE, Set.of("brokerTenant1_BROKER")); + }}); + expectedTags.put("table2_REALTIME", new HashMap<>() {{ + put(NodeConfig.Type.SERVER_NODE, Set.of("serverTag2_REALTIME")); + put(NodeConfig.Type.BROKER_NODE, Set.of("brokerTenant2_BROKER")); + }}); + + Assert.assertEquals(tableToHelixTags.size(), expectedTags.size(), + "Expected size of table to helix tags mapping does not match"); + for (Map.Entry<String, Map<NodeConfig.Type, Set<String>>> tableEntry : expectedTags.entrySet()) { + String tableName = tableEntry.getKey(); + Map<NodeConfig.Type, Set<String>> expectedNodeTags = tableEntry.getValue(); + // For each node type in the expected map, assert the tag exists + for (Map.Entry<NodeConfig.Type, Set<String>> entry : expectedNodeTags.entrySet()) { + NodeConfig.Type nodeType = entry.getKey(); + Set<String> expectedTag = entry.getValue(); + Assert.assertEquals(tableToHelixTags.get(tableName).get(nodeType), expectedTag, + "Expected " + expectedTag + " for " + tableName + " with node type " + nodeType + + " but found " + tableToHelixTags.get(tableName).get(nodeType)); + } + } + } + + @Test + public void getHelixTagsForTableTest() { + // Mock the behavior of getHelixTagsForTable to return a set of helix tags + TableConfig tableConfig = createTableConfig("table1", "serverTag1", "brokerTenant1", TableType.OFFLINE); + TableConfig tableConfig2 = createTableConfig("table1", "serverTag2", "brokerTenant2", TableType.REALTIME); + Mockito.when(_pinotHelixResourceManager.getTableConfig("table1_OFFLINE")).thenReturn(tableConfig); + Mockito.when(_pinotHelixResourceManager.getTableConfig("table1_REALTIME")).thenReturn(tableConfig2); + + // Define the expected helix tags for the table + Map<String, Set<String>> expected = new HashMap<>(); + expected.put("table1_OFFLINE", Set.of("serverTag1_OFFLINE", "brokerTenant1_BROKER")); + expected.put("table1_REALTIME", Set.of("serverTag2_REALTIME", "brokerTenant2_BROKER")); + + for (Map.Entry<String, Set<String>> entry : expected.entrySet()) { + String tableName = entry.getKey(); + Set<String> expectedHelixTags = entry.getValue(); + // Call the method to get helix tags for the table + List<String> helixTags = PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, + tableName); + // Verify the results + for (String helixTag : expectedHelixTags) { + Assert.assertTrue(helixTags.contains(helixTag), + "Expected helix tag " + helixTag + " for table " + tableName + " but found " + helixTags); + } + } + } + + @Test + public void getHelixTagToInstancesTest() { + // Create a list of instance configurations + List<InstanceConfig> instanceConfigs = List.of( + createInstanceConfig("instance1", List.of("serverTag1_OFFLINE")), + createInstanceConfig("instance2", List.of("serverTag1_OFFLINE")), + createInstanceConfig("instance3", List.of("brokerTenant1_BROKER")), + createInstanceConfig("instance4", List.of("brokerTenant1_BROKER")) + ); + Mockito.when(_pinotHelixResourceManager.getAllHelixInstanceConfigs()).thenReturn(instanceConfigs); + // Call the method to get Helix tag to instances mapping + Map<String, Set<String>> helixTagToInstances + = PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager); + // Verify the results + Map<String, Set<String>> expected = new HashMap<>(); + expected.put("serverTag1_OFFLINE", Set.of("instance1", "instance2")); + expected.put("brokerTenant1_BROKER", Set.of("instance3", "instance4")); + + Assert.assertEquals(helixTagToInstances.size(), expected.size(), + "Expected size of helix tag to instances mapping does not match"); + for (Map.Entry<String, Set<String>> entry : expected.entrySet()) { + String helixTag = entry.getKey(); + Set<String> expectedInstances = entry.getValue(); + Assert.assertTrue(helixTagToInstances.containsKey(helixTag), + "Expected helix tag " + helixTag + " but not found in the mapping"); + for (String instance : expectedInstances) { + Assert.assertTrue(helixTagToInstances.get(helixTag).contains(instance), + "Expected instance " + instance + " for helix tag " + helixTag + " but found " + + helixTagToInstances.get(helixTag)); + } + } + } + + @Test + public void getQueryWorkloadConfigsForTagsTest() { + // Create a list of query workload configurations + QueryWorkloadConfig workloadConfig1 = createQueryWorkloadConfig("workload1", + new PropagationScheme(PropagationScheme.Type.TABLE, List.of("table1", "table2")), + new PropagationScheme(PropagationScheme.Type.TABLE, List.of("table1", "table2"))); + QueryWorkloadConfig workloadConfig2 = createQueryWorkloadConfig("workload2", + new PropagationScheme(PropagationScheme.Type.TENANT, List.of("serverTag1")), + new PropagationScheme(PropagationScheme.Type.TENANT, List.of("brokerTenant1_BROKER"))); + QueryWorkloadConfig workloadConfig3 = createQueryWorkloadConfig("workload3", + new PropagationScheme(PropagationScheme.Type.TENANT, List.of("serverTag2_REALTIME")), + new PropagationScheme(PropagationScheme.Type.TENANT, List.of("brokerTenant2"))); + List<QueryWorkloadConfig> queryWorkloadConfigs = List.of(workloadConfig1, workloadConfig2, workloadConfig3); + // Create TableConfig for the workload + List<TableConfig> tableConfigs = List.of( + createTableConfig("table1", "serverTag1", "brokerTenant1", TableType.OFFLINE), + createTableConfig("table2", "serverTag2", "brokerTenant2", TableType.REALTIME) + ); + // Mock the behavior of getAllTableConfigs to return the list of table configurations + Mockito.when(_pinotHelixResourceManager.getAllTableConfigs()).thenReturn(tableConfigs); + + List<String> helixTags = List.of("serverTag1_OFFLINE", "brokerTenant1_BROKER", + "serverTag2_REALTIME", "brokerTenant2_BROKER"); + Set<QueryWorkloadConfig> workloadConfigsForTags = + PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, helixTags, + queryWorkloadConfigs); + // Verify the results + Set<QueryWorkloadConfig> expectedWorkloadConfigs = Set.of(workloadConfig1, workloadConfig2, workloadConfig3); + Assert.assertEquals(workloadConfigsForTags.size(), expectedWorkloadConfigs.size(), + "Expected size of workload configs for tags does not match"); + for (QueryWorkloadConfig workloadConfig : workloadConfigsForTags) { + Assert.assertTrue(expectedWorkloadConfigs.contains(workloadConfig), + "Expected workload config " + workloadConfig.getQueryWorkloadName() + + " but not found in the expected set"); + } + } + + private TableConfig createTableConfig(String tableName, String serverTag, String brokerTenant, TableType type) { + return new TableConfigBuilder(type) + .setTableName(tableName) + .setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy") + .setNumReplicas(1) + .setBrokerTenant(brokerTenant) + .setServerTenant(serverTag) + .setLoadMode("HEAP") + .setSegmentVersion(null) + .build(); + } + + private InstanceConfig createInstanceConfig(String instanceName, List<String> helixTags) { + InstanceConfig instanceConfig = new InstanceConfig(instanceName); + for (String helixTag : helixTags) { + instanceConfig.addTag(helixTag); + } + return instanceConfig; + } + + private QueryWorkloadConfig createQueryWorkloadConfig(String name, PropagationScheme serverScheme, + PropagationScheme brokerScheme) { + EnforcementProfile enforcementProfile = new EnforcementProfile(10, 10); + return new QueryWorkloadConfig(name, List.of( + new NodeConfig(NodeConfig.Type.SERVER_NODE, enforcementProfile, serverScheme), + new NodeConfig(NodeConfig.Type.BROKER_NODE, enforcementProfile, brokerScheme) + )); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 2fa066e991..6dec9b7903 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -99,6 +99,10 @@ public class Actions { public static final String UPDATE_INSTANCE_PARTITIONS = "UpdateInstancePartitions"; public static final String GET_RESPONSE_STORE = "GetResponseStore"; public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore"; + public static final String GET_QUERY_WORKLOAD_CONFIG = "GetQueryWorkloadConfig"; + public static final String GET_INSTANCE_QUERY_WORKLOAD_CONFIG = "GetInstanceQueryWorkloadConfig"; + public static final String UPDATE_QUERY_WORKLOAD_CONFIG = "UpdateQueryWorkloadConfig"; + public static final String DELETE_QUERY_WORKLOAD_CONFIG = "DeleteQueryWorkloadConfig"; public static final String GET_GROOVY_STATIC_ANALYZER_CONFIG = "GetGroovyStaticAnalyzerConfig"; public static final String UPDATE_GROOVY_STATIC_ANALYZER_CONFIG = "UpdateGroovyStaticAnalyzerConfig"; } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 76d4a3a39b..adb8509090 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -721,6 +721,12 @@ public abstract class BaseServerStarter implements ServiceStartable { new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics); _helixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory); + // Query workload message handler factory + QueryWorkloadMessageHandlerFactory queryWorkloadMessageHandlerFactory = + new QueryWorkloadMessageHandlerFactory(serverMetrics); + _helixManager.getMessagingService() + .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), + queryWorkloadMessageHandlerFactory); serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L); _helixManager.addPreConnectCallback( diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java new file mode 100644 index 0000000000..bdf91fc7ff --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.handling.HelixTaskResult; +import org.apache.helix.messaging.handling.MessageHandler; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.Message; +import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.config.workload.InstanceCost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueryWorkloadMessageHandlerFactory implements MessageHandlerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryWorkloadMessageHandlerFactory.class); + private final ServerMetrics _metrics; + + public QueryWorkloadMessageHandlerFactory(ServerMetrics metrics) { + _metrics = metrics; + } + + @Override + public MessageHandler createHandler(Message message, NotificationContext context) { + String messageType = message.getMsgSubType(); + if (messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE) + || messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) { + return new QueryWorkloadRefreshMessageHandler(new QueryWorkloadRefreshMessage(message), context); + } else { + throw new IllegalArgumentException("Unknown message subtype: " + messageType); + } + } + + // Gets called once during start up. We must return the same message type that this factory is registered for. + @Override + public String getMessageType() { + return Message.MessageType.USER_DEFINE_MSG.toString(); + } + + @Override + public void reset() { + LOGGER.info("Reset called"); + } + + private static class QueryWorkloadRefreshMessageHandler extends MessageHandler { + final String _queryWorkloadName; + final InstanceCost _instanceCost; + + QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage queryWorkloadRefreshMessage, + NotificationContext context) { + super(queryWorkloadRefreshMessage, context); + _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName(); + _instanceCost = queryWorkloadRefreshMessage.getInstanceCost(); + } + + @Override + public HelixTaskResult handleMessage() { + // TODO: Add logic to invoke the query workload manager to refresh/delete the query workload config + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) { + LOGGER.error("Got error while refreshing query workload config for query workload: {} (error code: {}," + + " error type: {})", _queryWorkloadName, errorCode, errorType, e); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java new file mode 100644 index 0000000000..b520e6c4a5 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import org.apache.pinot.spi.config.BaseJsonConfig; + +/** + * Defines the resource enforcement profile for a node within a query workload. + * <p> + * This profile specifies the maximum CPU time (in nanoseconds) and maximum memory (in bytes) + * that queries under this workload are allowed to consume on the node. + * </p> + * + * @see QueryWorkloadConfig + * @see NodeConfig + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class EnforcementProfile extends BaseJsonConfig { + + private static final String CPU_COST_NS = "cpuCostNs"; + private static final String MEMORY_COST_BYTES = "memoryCostBytes"; + + @JsonPropertyDescription("Max CPU cost allowed for the workload") + private long _cpuCostNs; + @JsonPropertyDescription("Max memory cost allowed for the workload") + private long _memoryCostBytes; + + /** + * Constructs an EnforcementProfile with specified resource limits. + * + * @param cpuCostNs maximum allowed CPU cost in nanoseconds for the workload + * @param memoryCostBytes maximum allowed memory cost in bytes for the workload + */ + public EnforcementProfile(@JsonProperty(CPU_COST_NS) long cpuCostNs, + @JsonProperty(MEMORY_COST_BYTES) long memoryCostBytes) { + _cpuCostNs = cpuCostNs; + _memoryCostBytes = memoryCostBytes; + } + + /** + * Returns the maximum CPU cost allowed for this workload. + * + * @return CPU cost limit in nanoseconds + */ + public long getCpuCostNs() { + return _cpuCostNs; + } + + /** + * Returns the maximum memory cost allowed for this workload. + * + * @return memory cost limit in bytes + */ + public long getMemoryCostBytes() { + return _memoryCostBytes; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java new file mode 100644 index 0000000000..a7befaaed1 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +/** + * Represents the resource cost profile of an instance for query workload enforcement. + * <p> + * This class defines the CPU and memory cost thresholds that an instance can incur + * when executing queries, used when propagating workload configurations to individual instances. + * </p> + * + */ +public class InstanceCost { + + private static final String CPU_COST_NS = "cpuCostNs"; + private static final String MEMORY_COST_BYTES = "memoryCostBytes"; + + /** + * The CPU cost threshold for the instance, in nanoseconds. + */ + @JsonPropertyDescription("CPU cost of the instance in nanoseconds") + private long _cpuCostNs; + /** + * The memory cost threshold for the instance, in bytes. + */ + @JsonPropertyDescription("Memory cost of the instance in bytes") + private long _memoryCostBytes; + + /** + * Constructs an InstanceCost profile with specified CPU and memory thresholds. + * + * @param cpuCostNs CPU cost threshold in nanoseconds for this instance + * @param memoryCostBytes memory cost threshold in bytes for this instance + */ + @JsonCreator + public InstanceCost(@JsonProperty(CPU_COST_NS) long cpuCostNs, + @JsonProperty(MEMORY_COST_BYTES) long memoryCostBytes) { + _cpuCostNs = cpuCostNs; + _memoryCostBytes = memoryCostBytes; + } + + /** + * Returns the CPU cost threshold for this instance. + * + * @return CPU cost limit in nanoseconds + */ + public long getCpuCostNs() { + return _cpuCostNs; + } + + /** + * Returns the memory cost threshold for this instance. + * + * @return memory cost limit in bytes + */ + public long getMemoryCostBytes() { + return _memoryCostBytes; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java new file mode 100644 index 0000000000..25be645191 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonValue; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + +/** + * Represents the configuration for a specific node type in a query workload. + * <p> + * Each NodeConfig specifies: + * <ul> + * <li><strong>Node Type:</strong> The role of the node in processing queries.</li> + * <li><strong>Enforcement Profile:</strong> Resource limits (CPU and memory) applied to this node.</li> + * <li><strong>Propagation Scheme:</strong> Optional instructions for cascading configs to downstream nodes.</li> + * </ul> + * </p> + * <p> + * This class is used within {@link QueryWorkloadConfig} to define per-node settings + * that tailor query execution behavior based on workload classification. + * </p> + * + * @see QueryWorkloadConfig + * @see EnforcementProfile + * @see PropagationScheme + */ +public class NodeConfig extends BaseJsonConfig { + + public enum Type { + BROKER_NODE("brokerNode"), + SERVER_NODE("serverNode"); + + private final String _value; + + Type(String jsonValue) { + _value = jsonValue; + } + + @JsonValue + public String getJsonValue() { + return _value; + } + + @JsonCreator + public static Type forValue(String value) { + if (value == null) { + return null; + } + // Normalize the input to lower case and trim spaces + String normalized = value.toLowerCase().trim(); + for (Type type : Type.values()) { + if (type.getJsonValue().toLowerCase().equals(normalized)) { + return type; + } + } + throw new IllegalArgumentException("Invalid node type: " + value); + } + } + + private static final String NODE_TYPE = "nodeType"; + private static final String ENFORCEMENT_PROFILE = "enforcementProfile"; + private static final String PROPAGATION_SCHEME = "propagationScheme"; + + /** + * The role of this node within the query workload, indicating whether it directly serves + * queries or acts as an intermediate forwarding node. + */ + @JsonPropertyDescription("Describes the type of node") + private Type _nodeType; + + /** + * The resource enforcement profile for this node, defining limits on CPU and memory + * usage for queries under this workload. + */ + @JsonPropertyDescription("Describes the enforcement profile for the node") + private EnforcementProfile _enforcementProfile; + + /** + * Optional propagation scheme that specifies how configuration settings are cascaded + * or shared with downstream nodes; may be null if no propagation is applied. + */ + @JsonPropertyDescription("Describes the propagation scheme for the node") + private PropagationScheme _propagationScheme; + + @JsonCreator + public NodeConfig( + @JsonProperty(NODE_TYPE) Type nodeType, + @JsonProperty(ENFORCEMENT_PROFILE) EnforcementProfile enforcementProfile, + @JsonProperty(PROPAGATION_SCHEME) @Nullable PropagationScheme propagationScheme) { + _nodeType = nodeType; + _enforcementProfile = enforcementProfile; + _propagationScheme = propagationScheme; + } + + public Type getNodeType() { + return _nodeType; + } + + public EnforcementProfile getEnforcementProfile() { + return _enforcementProfile; + } + + public PropagationScheme getPropagationScheme() { + return _propagationScheme; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java new file mode 100644 index 0000000000..ac68bbea68 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonValue; +import java.util.List; +import org.apache.pinot.spi.config.BaseJsonConfig; + +/** + * Defines how configuration settings are propagated across workloads. + * <p> + * A PropagationScheme determines the scope and specific values (e.g., tables or tenants) + * to which workload settings should be applied. This allows selective cascading + * of resource and query limits across different instances. + * </p> + * + * @see QueryWorkloadConfig + * @see NodeConfig + */ +public class PropagationScheme extends BaseJsonConfig { + + /** + * Enumerates the propagation scheme types that control the scope of propagation. + * <p> + * - TABLE: Propagate settings at the per-table level.<br> + * - TENANT: Propagate settings at the tenant (logical group) level. + * </p> + */ + public enum Type { + /** Propagate workload settings to individual tables. */ + TABLE("table"), + /** Propagate workload settings to all tables under a tenant. */ + TENANT("tenant"); + + private final String _value; + + Type(String value) { + _value = value; + } + + /** + * Returns the JSON string representation of this propagation type. + * + * @return the JSON value corresponding to this Type (e.g., "table", "tenant") + */ + @JsonValue + public String getJsonValue() { + return _value; + } + + /** + * Parses a JSON string into the corresponding Type enum. + * <p> + * Accepts case-insensitive and trimmed input matching defined JSON values. + * </p> + * + * @param value JSON string to parse (may be null) + * @return the matching Type enum, or null if input is null + * @throws IllegalArgumentException if the input does not match any Type + */ + @JsonCreator + public static Type forValue(String value) { + if (value == null) { + return null; + } + String normalized = value.toLowerCase().trim(); + for (Type type : Type.values()) { + if (type.getJsonValue().equals(normalized)) { + return type; + } + } + throw new IllegalArgumentException("Invalid propagation scheme type: " + value); + } + } + + private static final String PROPAGATION_TYPE = "propagationType"; + private static final String VALUES = "values"; + + /** + * The type of propagation to apply (per-table or per-tenant). + */ + @JsonPropertyDescription("Describes the type of propagation scheme") + private Type _propagationType; + + /** + * The specific identifiers (table names or tenant names) to which settings apply. + */ + @JsonPropertyDescription("Describes the values of the propagation scheme") + private List<String> _values; + + /** + * Constructs a PropagationScheme with the given type and target values. + * + * @param propagationType the Type of propagation (TABLE or TENANT) + * @param values the list of identifiers (tables or tenants) for propagation + */ + @JsonCreator + public PropagationScheme(@JsonProperty(PROPAGATION_TYPE) Type propagationType, + @JsonProperty(VALUES) List<String> values) { + _propagationType = propagationType; + _values = values; + } + + /** + * Returns the configured propagation type. + * + * @return the Type enum indicating propagation scope + */ + public Type getPropagationType() { + return _propagationType; + } + + /** + * Returns the list of target identifiers for propagation. + * + * @return list of table names or tenant names + */ + public List<String> getValues() { + return _values; + } + + /** + * Sets the propagation type. + * + * @param propagationType new Type to define propagation scope + */ + public void setPropagationType(Type propagationType) { + _propagationType = propagationType; + } + + /** + * Sets the target identifiers for propagation. + * + * @param values list of table or tenant names to apply settings to + */ + public void setValues(List<String> values) { + _values = values; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java new file mode 100644 index 0000000000..477a7f5668 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import java.util.List; +import org.apache.pinot.spi.config.BaseJsonConfig; + +/** + * Represents the configuration for a named query workload in a Pinot Helix cluster. + * <p> + * A QueryWorkload groups a set of nodes and associated configuration parameters under a single workload name. + * Workloads are applied at the cluster level: individual queries specify the workload they belong to by + * providing the workload name in their query options. This allows us to manage and enforce isolation by limiting + * the resources available to queries based on their workload classification. + * </p> + * <p><strong>Example:</strong></p> + * <pre>{@code + * { + * "queryWorkloadName": "analytics", + * "nodeConfigs": [ + * { + * "nodeType": "brokerNode", + * "enforcementProfile": { + * "cpuCostNs": 1000000, + * "memoryCostBytes": 10000000 + * }, + * "propagationScheme": { + * "propagationType": "TABLE", + * "values": ["airlineStats"] + * } + * }, + * { + * "nodeType": "serverNode", + * "enforcementProfile": { + * "cpuCostNs": 2000000, + * "memoryCostBytes": 20000000 + * }, + * "propagationScheme": { + * "propagationType": "TENANT", + * "values": ["tenantA", "tenantB"] + * } + * } + * ] + * } + * }</pre> + * + * @see NodeConfig + */ +public class QueryWorkloadConfig extends BaseJsonConfig { + + public static final String QUERY_WORKLOAD_NAME = "queryWorkloadName"; + public static final String NODE_CONFIGS = "nodeConfigs"; + + @JsonPropertyDescription("Describes the name for the query workload, this should be unique across the zk cluster") + private String _queryWorkloadName; + + @JsonPropertyDescription("Describes the node configs for the query workload") + private List<NodeConfig> _nodeConfigs; + + @JsonCreator + /** + * Constructs a new QueryWorkloadConfig instance. + * + * @param queryWorkloadName unique name identifying this workload across the cluster + * @param nodeConfigs list of per-node configurations for this workload + */ + public QueryWorkloadConfig(@JsonProperty(QUERY_WORKLOAD_NAME) String queryWorkloadName, + @JsonProperty(NODE_CONFIGS) List<NodeConfig> nodeConfigs) { + _queryWorkloadName = queryWorkloadName; + _nodeConfigs = nodeConfigs; + } + + /** + * Returns the unique name of this query workload. + * + * @return the workload name used by queries to specify this workload + */ + public String getQueryWorkloadName() { + return _queryWorkloadName; + } + + /** + * Returns the list of node-specific configurations for this workload. + * + * @return list of NodeConfig objects for this workload + */ + public List<NodeConfig> getNodeConfigs() { + return _nodeConfigs; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index bd76f381c0..81af254f52 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -72,6 +72,7 @@ public class CommonConstants { public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1"; public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = "pinot.tar.compression.codec.name"; + public static final String QUERY_WORKLOAD = "queryWorkload"; public static final String JFR = "pinot.jfr"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org