This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a75d4abb2 Binary Workload Scheduler for constrained execution of a 
set of queries  (#13847)
3a75d4abb2 is described below

commit 3a75d4abb2c897b5b368ea55ffd7a4e6ea8ba0e5
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Sat Aug 24 09:53:01 2024 -0700

    Binary Workload Scheduler for constrained execution of a set of queries  
(#13847)
    
    * Binary Workload Scheduler
    
    * Address review comments
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   4 +
 .../apache/pinot/common/metrics/ServerTimer.java   |   3 +
 .../common/utils/config/QueryOptionsUtils.java     |   4 +
 .../query/scheduler/BinaryWorkloadScheduler.java   | 219 +++++++++++++++++++++
 .../query/scheduler/QuerySchedulerFactory.java     |   4 +
 .../query/scheduler/SecondaryWorkloadQueue.java    | 167 ++++++++++++++++
 .../resources/BinaryWorkloadResourceManager.java   | 105 ++++++++++
 .../query/scheduler/QuerySchedulerFactoryTest.java |   5 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   8 +
 9 files changed, 519 insertions(+)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 8cadec6bfc..799c8790c2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -30,6 +30,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
   RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
   SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
+  NUM_SECONDARY_QUERIES("queries", false),
+  NUM_SECONDARY_QUERIES_SCHEDULED("queries", false),
+  SERVER_OUT_OF_CAPACITY_EXCEPTIONS("exceptions", false),
+
   QUERY_EXECUTION_EXCEPTIONS("exceptions", false),
   HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
   DELETED_SEGMENT_COUNT("segments", false),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index b3e5e70641..63b42440a6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -59,6 +59,9 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
       "Total time taken to delete expired dedup primary keys based on 
metadataTTL or deletedKeysTTL"),
 
+  SECONDARY_Q_WAIT_TIME_MS("milliseconds", false,
+      "Time spent waiting in the secondary queue when BinaryWorkloadScheduler 
is used."),
+
   // Multi-stage
   /**
    * Time spent building the hash table for the join.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index fe1b348a28..d1900f6a8f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -310,4 +310,8 @@ public class QueryOptionsUtils {
   public static boolean isSkipUnavailableServers(Map<String, String> 
queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS));
   }
+
+  public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
new file mode 100644
index 0000000000..93e84c1e6a
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import 
org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This scheduler is designed to deal with two types of workloads
+ * 1. Primary Workloads -> regular queries from the application
+ * 2. Secondary Workloads -> adhoc queries fired from tools, testing, etc
+ *
+ *
+ * Primary Workload Queries
+ * Primary workloads queries are executed with priority and submitted to the 
Runner threads as and when they arrive.
+ * The resources used by a primary workload query is not capped.
+ *
+ * Secondary Workload Queries
+ *   - Secondary workload queries are identified using a query option -> "SET 
isSecondaryWorkload=true"
+ *   - Secondary workload queries are contained as follows:
+ *       - Restrictions on number of runner threads available to process 
secondary queries
+ *       - Restrictions on total number of worker threads available to process 
a single secondary query
+ *       - Restrictions on total number of worker threads available to process 
all in-progress secondary queries
+ */
+public class BinaryWorkloadScheduler extends QueryScheduler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BinaryWorkloadScheduler.class);
+
+  public static final String MAX_SECONDARY_QUERIES = 
"binarywlm.maxSecondaryRunnerThreads";
+  public static final int DEFAULT_MAX_SECONDARY_QUERIES = 5;
+
+  // Secondary Workload Runners.
+  private final int _numSecondaryRunners;
+  private final Semaphore _secondaryRunnerSemaphore;
+
+  private final SecondaryWorkloadQueue _secondaryQueryQ;
+
+  Thread _scheduler;
+
+  public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics metrics,
+      LongAccumulator latestQueryTime) {
+    super(config, queryExecutor, new BinaryWorkloadResourceManager(config), 
metrics, latestQueryTime);
+
+    _secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager);
+    _numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES, 
DEFAULT_MAX_SECONDARY_QUERIES);
+    LOGGER.info("numSecondaryRunners={}", _numSecondaryRunners);
+    _secondaryRunnerSemaphore = new Semaphore(_numSecondaryRunners);
+  }
+
+  @Override
+  public String name() {
+    return "BinaryWorkloadScheduler";
+  }
+
+  @Override
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    if (!_isRunning) {
+      return immediateErrorResponse(queryRequest, 
QueryException.SERVER_SCHEDULER_DOWN_ERROR);
+    }
+
+    
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
+    if 
(!QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions()))
 {
+      QueryExecutorService queryExecutorService = 
_resourceManager.getExecutorService(queryRequest, null);
+      ListenableFutureTask<byte[]> queryTask = 
createQueryFutureTask(queryRequest, queryExecutorService);
+      _resourceManager.getQueryRunners().submit(queryTask);
+      return queryTask;
+    }
+
+    final SchedulerQueryContext schedQueryContext = new 
SchedulerQueryContext(queryRequest);
+    try {
+      // Update metrics
+      _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(), 
ServerMeter.NUM_SECONDARY_QUERIES, 1L);
+
+      _secondaryQueryQ.put(schedQueryContext);
+    } catch (OutOfCapacityException e) {
+      LOGGER.error("Out of capacity for query {} table {}, message: {}", 
queryRequest.getRequestId(),
+          queryRequest.getTableNameWithType(), e.getMessage());
+      return immediateErrorResponse(queryRequest, 
QueryException.SERVER_OUT_OF_CAPACITY_ERROR);
+    } catch (Exception e) {
+      // We should not throw any other exception other than 
OutOfCapacityException. Signal that there's an issue with
+      // the scheduler if any other exception is thrown.
+      LOGGER.error("Internal error for query {} table {}, message {}", 
queryRequest.getRequestId(),
+          queryRequest.getTableNameWithType(), e.getMessage());
+      return immediateErrorResponse(queryRequest, 
QueryException.SERVER_SCHEDULER_DOWN_ERROR);
+    }
+    return schedQueryContext.getResultFuture();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    _scheduler = getScheduler();
+    _scheduler.setName("scheduler");
+    // TODO: Considering setting a lower priority to avoid busy loop when all 
threads are busy processing queries.
+    _scheduler.setPriority(Thread.MAX_PRIORITY);
+    _scheduler.setDaemon(true);
+    _scheduler.start();
+  }
+
+  private Thread getScheduler() {
+    return new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (_isRunning) {
+          try {
+            _secondaryRunnerSemaphore.acquire();
+          } catch (InterruptedException e) {
+            if (!_isRunning) {
+              LOGGER.info("Shutting down scheduler");
+            } else {
+              LOGGER.error("Interrupt while acquiring semaphore. Exiting.", e);
+            }
+            break;
+          }
+          try {
+            final SchedulerQueryContext request = _secondaryQueryQ.take();
+            if (request == null) {
+              continue;
+            }
+            ServerQueryRequest queryRequest = request.getQueryRequest();
+            final QueryExecutorService executor =
+                _resourceManager.getExecutorService(queryRequest, 
request.getSchedulerGroup());
+            final ListenableFutureTask<byte[]> queryFutureTask = 
createQueryFutureTask(queryRequest, executor);
+            queryFutureTask.addListener(new Runnable() {
+              @Override
+              public void run() {
+                executor.releaseWorkers();
+                request.getSchedulerGroup().endQuery();
+                _secondaryRunnerSemaphore.release();
+                checkStopResourceManager();
+              }
+            }, MoreExecutors.directExecutor());
+
+            // Update metrics
+            updateSecondaryWorkloadMetrics(queryRequest);
+
+            request.setResultFuture(queryFutureTask);
+            request.getSchedulerGroup().startQuery();
+            _resourceManager.getQueryRunners().submit(queryFutureTask);
+          } catch (Throwable t) {
+            LOGGER.error(
+                "Error in scheduler thread. This is indicative of a bug. 
Please report this. Server will continue "
+                    + "with errors", t);
+          }
+        }
+        if (_isRunning) {
+          throw new RuntimeException("FATAL: Scheduler thread is 
quitting.....something went horribly wrong.....!!!");
+        } else {
+          failAllPendingQueries();
+        }
+      }
+    });
+  }
+
+  private void updateSecondaryWorkloadMetrics(ServerQueryRequest queryRequest) 
{
+    long timeInQMs = System.currentTimeMillis() - 
queryRequest.getTimerContext().getQueryArrivalTimeMs();
+    _serverMetrics.addTimedTableValue(queryRequest.getTableNameWithType(), 
ServerTimer.SECONDARY_Q_WAIT_TIME_MS,
+        timeInQMs, TimeUnit.MILLISECONDS);
+    _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(),
+        ServerMeter.NUM_SECONDARY_QUERIES_SCHEDULED, 1L);
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+    // without this, scheduler will never stop if there are no pending queries
+    if (_scheduler != null) {
+      _scheduler.interrupt();
+    }
+  }
+
+  private void checkStopResourceManager() {
+    if (!_isRunning && _secondaryRunnerSemaphore.availablePermits() == 
_numSecondaryRunners) {
+      _resourceManager.stop();
+    }
+  }
+
+  synchronized private void failAllPendingQueries() {
+    List<SchedulerQueryContext> pending = _secondaryQueryQ.drain();
+    for (SchedulerQueryContext queryContext : pending) {
+      queryContext.setResultFuture(
+          immediateErrorResponse(queryContext.getQueryRequest(), 
QueryException.SERVER_SCHEDULER_DOWN_ERROR));
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
index 86c7170d93..69dd8bed36 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
@@ -45,6 +45,7 @@ public class QuerySchedulerFactory {
   public static final String FCFS_ALGORITHM = "fcfs";
   public static final String TOKEN_BUCKET_ALGORITHM = "tokenbucket";
   public static final String BOUNDED_FCFS_ALGORITHM = "bounded_fcfs";
+  public static final String BINARY_WORKLOAD_ALGORITHM = "binary_workload";
   public static final String ALGORITHM_NAME_CONFIG_KEY = "name";
   public static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = 
FCFS_ALGORITHM;
 
@@ -73,6 +74,9 @@ public class QuerySchedulerFactory {
       case BOUNDED_FCFS_ALGORITHM:
         scheduler = BoundedFCFSScheduler.create(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime);
         break;
+      case BINARY_WORKLOAD_ALGORITHM:
+        scheduler = new BinaryWorkloadScheduler(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime);
+        break;
       default:
         scheduler =
             getQuerySchedulerByClassName(schedulerName, schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java
new file mode 100644
index 0000000000..82736dbf0f
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.fcfs.FCFSSchedulerGroup;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Queue to maintain secondary workload queries. Used by the 
BinaryWorkloadScheduler.
+ */
+public class SecondaryWorkloadQueue {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SecondaryWorkloadQueue.class);
+  private static final String SECONDARY_WORKLOAD_GROUP_NAME = "Secondary";
+
+  public static final String SECONDARY_QUEUE_QUERY_TIMEOUT = 
"binarywlm.secondaryQueueQueryTimeout";
+  private static final int DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC = 40;
+
+  public static final String MAX_PENDING_SECONDARY_QUERIES = 
"binarywlm.maxPendingSecondaryQueries";
+  private static final int DEFAULT_MAX_PENDING_SECONDARY_QUERIES = 20;
+
+  public static final String QUEUE_WAKEUP_MS = "binarywlm.queueWakeupMs";
+  private static final int DEFAULT_WAKEUP_MS = 1;
+
+  private static int _wakeUpTimeMs;
+  private final int _maxPendingPerGroup;
+
+  private final SchedulerGroup _schedulerGroup;
+
+  private final Lock _queueLock = new ReentrantLock();
+  private final Condition _queryReaderCondition = _queueLock.newCondition();
+  private final ResourceManager _resourceManager;
+  private final int _queryDeadlineMs;
+
+  public SecondaryWorkloadQueue(PinotConfiguration config, ResourceManager 
resourceManager) {
+    Preconditions.checkNotNull(config);
+    Preconditions.checkNotNull(resourceManager);
+
+    _queryDeadlineMs =
+        config.getProperty(SECONDARY_QUEUE_QUERY_TIMEOUT, 
DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC) * 1000;
+    _wakeUpTimeMs = config.getProperty(QUEUE_WAKEUP_MS, DEFAULT_WAKEUP_MS);
+    _maxPendingPerGroup = config.getProperty(MAX_PENDING_SECONDARY_QUERIES, 
DEFAULT_MAX_PENDING_SECONDARY_QUERIES);
+    LOGGER.info("queryDeadlineMs={}, wakeupTimeMs={},maxPendingPerGroup={}", 
_queryDeadlineMs, _wakeUpTimeMs,
+        _maxPendingPerGroup);
+    _schedulerGroup = new FCFSSchedulerGroup(SECONDARY_WORKLOAD_GROUP_NAME);
+    _resourceManager = resourceManager;
+  }
+
+  /**
+   * Adds a query to the secondary workload queue.
+   * @param query
+   * @throws OutOfCapacityException
+   */
+  public void put(SchedulerQueryContext query)
+      throws OutOfCapacityException {
+    Preconditions.checkNotNull(query);
+    _queueLock.lock();
+    try {
+      checkSchedulerGroupCapacity(query);
+      query.setSchedulerGroupContext(_schedulerGroup);
+      _schedulerGroup.addLast(query);
+      _queryReaderCondition.signal();
+    } finally {
+      _queueLock.unlock();
+    }
+  }
+
+  /**
+   * Blocking call to read the next query
+   * @return
+   */
+  @Nullable
+  public SchedulerQueryContext take() {
+    _queueLock.lock();
+    try {
+      while (true) {
+        SchedulerQueryContext schedulerQueryContext;
+        while ((schedulerQueryContext = takeNextInternal()) == null) {
+          try {
+            _queryReaderCondition.await(_wakeUpTimeMs, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            return null;
+          }
+        }
+        return schedulerQueryContext;
+      }
+    } finally {
+      _queueLock.unlock();
+    }
+  }
+
+  public List<SchedulerQueryContext> drain() {
+    List<SchedulerQueryContext> pending = new ArrayList<>();
+    _queueLock.lock();
+    try {
+      while (!_schedulerGroup.isEmpty()) {
+        pending.add(_schedulerGroup.removeFirst());
+      }
+    } finally {
+      _queueLock.unlock();
+    }
+    return pending;
+  }
+
+  private SchedulerQueryContext takeNextInternal() {
+    long startTimeMs = System.currentTimeMillis();
+    long deadlineEpochMillis = startTimeMs - _queryDeadlineMs;
+
+    _schedulerGroup.trimExpired(deadlineEpochMillis);
+    if (_schedulerGroup.isEmpty() || 
!_resourceManager.canSchedule(_schedulerGroup)) {
+      return null;
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("SchedulerInfo:");
+      sb.append(_schedulerGroup.toString());
+      ServerQueryRequest queryRequest = 
_schedulerGroup.peekFirst().getQueryRequest();
+      sb.append(String.format(" Group: %s: [%d,%d,%d,%d]", 
_schedulerGroup.name(),
+          queryRequest.getTimerContext().getQueryArrivalTimeMs(), 
queryRequest.getRequestId(),
+          queryRequest.getSegmentsToQuery().size(), startTimeMs));
+      LOGGER.debug(sb.toString());
+    }
+
+    SchedulerQueryContext query = _schedulerGroup.removeFirst();
+    return query;
+  }
+
+  private void checkSchedulerGroupCapacity(SchedulerQueryContext query)
+      throws OutOfCapacityException {
+    if (_schedulerGroup.numPending() >= _maxPendingPerGroup
+        && _schedulerGroup.totalReservedThreads() >= 
_resourceManager.getTableThreadsHardLimit()) {
+      throw new OutOfCapacityException(String.format(
+          "SchedulerGroup %s is out of capacity. numPending: %d, maxPending: 
%d, reservedThreads: %d "
+              + "threadsHardLimit: %d", _schedulerGroup.name(), 
_schedulerGroup.numPending(), _maxPendingPerGroup,
+          _schedulerGroup.totalReservedThreads(), 
_resourceManager.getTableThreadsHardLimit()));
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
new file mode 100644
index 0000000000..fb7ffd93e3
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResourceManager for BinaryWorkloadScheduler.
+ */
+public class BinaryWorkloadResourceManager extends ResourceManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
+  private final ResourceLimitPolicy _secondaryWorkloadPolicy;
+
+  public BinaryWorkloadResourceManager(PinotConfiguration config) {
+    super(config);
+    _secondaryWorkloadPolicy = new ResourceLimitPolicy(config, 
_numQueryWorkerThreads);
+  }
+
+  /**
+   * Returns an executor service that query executor can use like a dedicated
+   * service for submitting jobs for parallel execution.
+   * @param query
+   * @param accountant Accountant for a scheduler group
+   * @return UnboundedExecutorService for primary workload queries. For 
secondary workload queries, returns a
+   * BoundedAccountingExecutor service that limits the number of threads 
available for query execution. Query
+   * execution can submit tasks for parallel execution without need
+   * for limiting their parallelism.
+   */
+  @Override
+  public QueryExecutorService getExecutorService(ServerQueryRequest query, 
SchedulerGroupAccountant accountant) {
+    if 
(!QueryOptionsUtils.isSecondaryWorkload(query.getQueryContext().getQueryOptions()))
 {
+      return getPrimaryWorkloadExecutorService();
+    }
+
+    return getSecondaryWorkloadExecutorService(query, accountant);
+  }
+
+  @Override
+  public int getTableThreadsHardLimit() {
+    return _secondaryWorkloadPolicy.getTableThreadsHardLimit();
+  }
+
+  @Override
+  public int getTableThreadsSoftLimit() {
+    return _secondaryWorkloadPolicy.getTableThreadsSoftLimit();
+  }
+
+  private QueryExecutorService getPrimaryWorkloadExecutorService() {
+    return new QueryExecutorService() {
+      @Override
+      public void execute(Runnable command) {
+        _queryWorkers.submit(command);
+      }
+    };
+  }
+
+  private QueryExecutorService 
getSecondaryWorkloadExecutorService(ServerQueryRequest query,
+      SchedulerGroupAccountant accountant) {
+    int numSegments = query.getSegmentsToQuery().size();
+    int queryThreadLimit = Math.max(1, 
Math.min(_secondaryWorkloadPolicy.getMaxThreadsPerQuery(), numSegments));
+    int spareThreads = _secondaryWorkloadPolicy.getTableThreadsHardLimit() - 
accountant.totalReservedThreads();
+    if (spareThreads <= 0) {
+      LOGGER.warn("UNEXPECTED: Attempt to schedule query uses more than the 
configured hard limit on threads");
+      spareThreads = 1;
+    } else {
+      spareThreads = Math.min(spareThreads, queryThreadLimit);
+    }
+    Preconditions.checkState(spareThreads >= 1);
+    // We do not bound number of threads here by total available threads. We 
can potentially
+    // over-provision number of threads here. That is intentional and 
(potentially) good solution.
+    // Queries don't use their workers all the time. So, reserving workers 
leads to suboptimal resource
+    // utilization. We want to keep the pipe as full as possible for query 
workers. Overprovisioning is one
+    // way to achieve that (in fact, only way for us).  There is a 
couter-argument to be made that overprovisioning
+    // can impact cache-lines and memory in general.
+    // We use this thread reservation only to determine priority based on 
resource utilization and not as a way to
+    // improve system performance (because we don't have good insight on that 
yet)
+    accountant.addReservedThreads(spareThreads);
+    // TODO: For 1 thread we should have the query run in the same queryRunner 
thread
+    // by supplying an executor service that similar to Guava' directExecutor()
+    return new BoundedAccountingExecutor(_queryWorkers, spareThreads, 
accountant);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
index a495c6c5d7..9c4c59ea1e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
@@ -55,6 +55,11 @@ public class QuerySchedulerFactoryTest {
     queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
     assertTrue(queryScheduler instanceof BoundedFCFSScheduler);
 
+    config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
+        QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
+    assertTrue(queryScheduler instanceof BinaryWorkloadScheduler);
+
     config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, 
TestQueryScheduler.class.getName());
     queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
     assertTrue(queryScheduler instanceof TestQueryScheduler);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2bfc61e7f4..21e5c87ee7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -428,6 +428,14 @@ public class CommonConstants {
 
         // If query submission causes an exception, still continue to submit 
the query to other servers
         public static final String SKIP_UNAVAILABLE_SERVERS = 
"skipUnavailableServers";
+
+        // Indicates that a query belongs to a secondary workload when using 
the BinaryWorkloadScheduler. The
+        // BinaryWorkloadScheduler divides queries into two workloads, primary 
and secondary. Primary workloads are
+        // executed in an  Unbounded FCFS fashion. However, secondary 
workloads are executed in a constrainted FCFS
+        // fashion with limited compute.des queries into two workloads, 
primary and secondary. Primary workloads are
+        // executed in an  Unbounded FCFS fashion. However, secondary 
workloads are executed in a constrainted FCFS
+        // fashion with limited compute.
+        public static final String IS_SECONDARY_WORKLOAD = 
"isSecondaryWorkload";
       }
 
       public static class QueryOptionValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to