This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb06abc2c2f [Query Resource Isolation] Fix Refresh message (#16636)
fb06abc2c2f is described below

commit fb06abc2c2f0b956e6cdbe4ec2875f3de9abcce9
Author: Praveen <[email protected]>
AuthorDate: Thu Aug 28 12:52:20 2025 -0700

    [Query Resource Isolation] Fix Refresh message (#16636)
    
    * Fix Refresh message
    
    * delete queryworkload message handler
    
    * info -> debug logs
---
 .../BrokerUserDefinedMessageHandlerFactory.java    | 28 ++++++-
 .../messages/QueryWorkloadRefreshMessage.java      |  2 +-
 .../server/starter/helix/BaseServerStarter.java    |  6 --
 .../helix/QueryWorkloadMessageHandlerFactory.java  | 87 ----------------------
 .../helix/SegmentMessageHandlerFactory.java        | 51 +++++++++++++
 .../spi/accounting/WorkloadBudgetManager.java      | 15 +++-
 6 files changed, 88 insertions(+), 101 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 1b2e7ed0452..68dbc098bd8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,20 +269,39 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
   private static class QueryWorkloadRefreshMessageHandler extends 
MessageHandler {
     final String _queryWorkloadName;
     final InstanceCost _instanceCost;
+    final String _messageType;
 
     QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage 
queryWorkloadRefreshMessage,
         NotificationContext context) {
       super(queryWorkloadRefreshMessage, context);
       _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
       _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+      _messageType = queryWorkloadRefreshMessage.getMsgSubType();
     }
 
     @Override
     public HelixTaskResult handleMessage() {
-      // TODO: Add logic to invoke the query workload manager to 
refresh/delete the query workload config
-      HelixTaskResult result = new HelixTaskResult();
-      result.setSuccess(true);
-      return result;
+      LOGGER.info("Handling query workload message: {}", _message);
+      try {
+        if 
(_messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
 {
+          
Tracing.ThreadAccountantOps.getWorkloadBudgetManager().deleteWorkload(_queryWorkloadName);
+        } else if 
(_messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE))
 {
+          if (_instanceCost == null) {
+            throw new IllegalStateException(
+                "Instance cost is not provided for refreshing query workload: 
" + _queryWorkloadName);
+          }
+          Tracing.ThreadAccountantOps.getWorkloadBudgetManager()
+            .addOrUpdateWorkload(_queryWorkloadName, 
_instanceCost.getCpuCostNs(), _instanceCost.getMemoryCostBytes());
+        } else {
+          throw new IllegalStateException("Unknown message type: " + 
_messageType);
+        }
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        return result;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to handle query workload message: {}", 
_queryWorkloadName, e);
+        throw e;
+      }
     }
 
     @Override
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
index 85f4da123ed..70c13193f60 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
@@ -54,7 +54,7 @@ public class QueryWorkloadRefreshMessage extends Message {
   public QueryWorkloadRefreshMessage(Message message) {
     super(message.getRecord());
     if (!message.getMsgSubType().equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
-        || 
!message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
+        && 
!message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
       throw new IllegalArgumentException("Unknown message subtype:" + 
message.getMsgSubType());
     }
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 0c59296e8f8..e7397a58776 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -750,12 +750,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
     _helixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
messageHandlerFactory);
-    // Query workload message handler factory
-    QueryWorkloadMessageHandlerFactory queryWorkloadMessageHandlerFactory =
-        new QueryWorkloadMessageHandlerFactory(serverMetrics);
-    _helixManager.getMessagingService()
-        
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-            queryWorkloadMessageHandlerFactory);
 
     serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> 
_helixManager.isConnected() ? 1L : 0L);
     _helixManager.addPreConnectCallback(
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
deleted file mode 100644
index bdf91fc7ff5..00000000000
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.helix;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.Message;
-import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.spi.config.workload.InstanceCost;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QueryWorkloadMessageHandlerFactory implements 
MessageHandlerFactory {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryWorkloadMessageHandlerFactory.class);
-    private final ServerMetrics _metrics;
-
-    public QueryWorkloadMessageHandlerFactory(ServerMetrics metrics) {
-        _metrics = metrics;
-    }
-
-    @Override
-    public MessageHandler createHandler(Message message, NotificationContext 
context) {
-       String messageType = message.getMsgSubType();
-       if 
(messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
-           || 
messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
 {
-           return new QueryWorkloadRefreshMessageHandler(new 
QueryWorkloadRefreshMessage(message), context);
-        } else {
-            throw new IllegalArgumentException("Unknown message subtype: " + 
messageType);
-        }
-    }
-
-    // Gets called once during start up. We must return the same message type 
that this factory is registered for.
-    @Override
-    public String getMessageType() {
-        return Message.MessageType.USER_DEFINE_MSG.toString();
-    }
-
-    @Override
-    public void reset() {
-        LOGGER.info("Reset called");
-    }
-
-    private static class QueryWorkloadRefreshMessageHandler extends 
MessageHandler {
-        final String _queryWorkloadName;
-        final InstanceCost _instanceCost;
-
-        QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage 
queryWorkloadRefreshMessage,
-                                           NotificationContext context) {
-            super(queryWorkloadRefreshMessage, context);
-            _queryWorkloadName = 
queryWorkloadRefreshMessage.getQueryWorkloadName();
-            _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
-        }
-
-        @Override
-        public HelixTaskResult handleMessage() {
-            // TODO: Add logic to invoke the query workload manager to 
refresh/delete the query workload config
-            HelixTaskResult result = new HelixTaskResult();
-            result.setSuccess(true);
-            return result;
-        }
-
-        @Override
-        public void onError(Exception e, ErrorCode errorCode, ErrorType 
errorType) {
-            LOGGER.error("Got error while refreshing query workload config for 
query workload: {} (error code: {},"
-                    + " error type: {})", _queryWorkloadName, errorCode, 
errorType, e);
-        }
-    }
-}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 3429ec0c3ce..a25b449810b 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.Message;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.messages.ForceCommitMessage;
 import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
@@ -43,6 +44,8 @@ import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,6 +80,9 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         return new IngestionMetricsRemoveMessageHandler(new 
IngestionMetricsRemoveMessage(message), _metrics, context);
       case TableConfigSchemaRefreshMessage.REFRESH_TABLE_CONFIG_AND_SCHEMA:
         return new TableSchemaRefreshMessageHandler(new 
TableConfigSchemaRefreshMessage(message), _metrics, context);
+      case QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE:
+      case QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE:
+        return new QueryWorkloadRefreshMessageHandler(new 
QueryWorkloadRefreshMessage(message), _metrics, context);
       default:
         LOGGER.warn("Unsupported user defined message sub type: {} for 
segment: {}", msgSubType,
             message.getPartitionName());
@@ -273,6 +279,51 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
     }
   }
 
+  private static class QueryWorkloadRefreshMessageHandler extends 
DefaultMessageHandler {
+    final String _queryWorkloadName;
+    final InstanceCost _instanceCost;
+    final String _messageType;
+
+    QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage 
queryWorkloadRefreshMessage,
+                                       ServerMetrics metrics, 
NotificationContext context) {
+      super(queryWorkloadRefreshMessage, metrics, context);
+      _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
+      _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+      _messageType = queryWorkloadRefreshMessage.getMsgSubType();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      LOGGER.info("Handling query workload message: {}", _message);
+      try {
+        if 
(_messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
 {
+          
Tracing.ThreadAccountantOps.getWorkloadBudgetManager().deleteWorkload(_queryWorkloadName);
+        } else if 
(_messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE))
 {
+          if (_instanceCost == null) {
+            throw new IllegalStateException(
+                "Instance cost is not provided for refreshing query workload: 
" + _queryWorkloadName);
+          }
+          Tracing.ThreadAccountantOps.getWorkloadBudgetManager()
+            .addOrUpdateWorkload(_queryWorkloadName, 
_instanceCost.getCpuCostNs(), _instanceCost.getMemoryCostBytes());
+        } else {
+          throw new IllegalStateException("Unknown message type: " + 
_messageType);
+        }
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        return result;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to handle query workload message: {}", 
_queryWorkloadName, e);
+        throw e;
+      }
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{
+      LOGGER.error("Got error while refreshing query workload config for query 
workload: {} (error code: {},"
+          + " error type: {})", _queryWorkloadName, errorCode, errorType, e);
+    }
+  }
+
   private static class DefaultMessageHandler extends MessageHandler {
     final String _segmentName;
     final String _tableNameWithType;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
index 83f03931c3f..ddd2734dc76 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
@@ -33,7 +33,7 @@ public class WorkloadBudgetManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkloadBudgetManager.class);
 
   private long _enforcementWindowMs;
-  private final ConcurrentHashMap<String, Budget> _workloadBudgets = new 
ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, Budget> _workloadBudgets;
   private final ScheduledExecutorService _resetScheduler = 
Executors.newSingleThreadScheduledExecutor();
   private volatile boolean _isEnabled;
 
@@ -45,6 +45,7 @@ public class WorkloadBudgetManager {
       LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op 
instance.");
       return;
     }
+    _workloadBudgets = new ConcurrentHashMap<>();
     _enforcementWindowMs = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
         CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
     initSecondaryWorkloadBudget(config);
@@ -113,6 +114,15 @@ public class WorkloadBudgetManager {
         memoryBudgetBytes);
   }
 
+  public void deleteWorkload(String workload) {
+    if (!_isEnabled) {
+      LOGGER.info("WorkloadBudgetManager is disabled. Not deleting workload: 
{}", workload);
+      return;
+    }
+    _workloadBudgets.remove(workload);
+    LOGGER.info("Removed workload: {}", workload);
+  }
+
   /**
    * Collects workload stats for CPU and memory usage.
    * Could be overridden for custom implementations
@@ -172,8 +182,7 @@ public class WorkloadBudgetManager {
    * Periodically resets budgets at the end of each enforcement window 
(Thread-Safe).
    */
   private void startBudgetResetTask() {
-    // TODO(Vivek): Reduce logging verbosity. Maybe make it debug logs.
-    LOGGER.info("Starting budget reset task with enforcement window: {}ms", 
_enforcementWindowMs);
+    LOGGER.debug("Starting budget reset task with enforcement window: {}ms", 
_enforcementWindowMs);
     _resetScheduler.scheduleAtFixedRate(() -> {
       LOGGER.debug("Resetting all workload budgets.");
       // Also print the budget used in the last enforcement window.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to