This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3a75d4abb2 Binary Workload Scheduler for constrained execution of a set of queries (#13847) 3a75d4abb2 is described below commit 3a75d4abb2c897b5b368ea55ffd7a4e6ea8ba0e5 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Sat Aug 24 09:53:01 2024 -0700 Binary Workload Scheduler for constrained execution of a set of queries (#13847) * Binary Workload Scheduler * Address review comments --- .../apache/pinot/common/metrics/ServerMeter.java | 4 + .../apache/pinot/common/metrics/ServerTimer.java | 3 + .../common/utils/config/QueryOptionsUtils.java | 4 + .../query/scheduler/BinaryWorkloadScheduler.java | 219 +++++++++++++++++++++ .../query/scheduler/QuerySchedulerFactory.java | 4 + .../query/scheduler/SecondaryWorkloadQueue.java | 167 ++++++++++++++++ .../resources/BinaryWorkloadResourceManager.java | 105 ++++++++++ .../query/scheduler/QuerySchedulerFactoryTest.java | 5 + .../apache/pinot/spi/utils/CommonConstants.java | 8 + 9 files changed, 519 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 8cadec6bfc..799c8790c2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -30,6 +30,10 @@ public enum ServerMeter implements AbstractMetrics.Meter { REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true), RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true), SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true), + NUM_SECONDARY_QUERIES("queries", false), + NUM_SECONDARY_QUERIES_SCHEDULED("queries", false), + SERVER_OUT_OF_CAPACITY_EXCEPTIONS("exceptions", false), + QUERY_EXECUTION_EXCEPTIONS("exceptions", false), HELIX_ZOOKEEPER_RECONNECTS("reconnects", true), DELETED_SEGMENT_COUNT("segments", false), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java index b3e5e70641..63b42440a6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java @@ -59,6 +59,9 @@ public enum ServerTimer implements AbstractMetrics.Timer { DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false, "Total time taken to delete expired dedup primary keys based on metadataTTL or deletedKeysTTL"), + SECONDARY_Q_WAIT_TIME_MS("milliseconds", false, + "Time spent waiting in the secondary queue when BinaryWorkloadScheduler is used."), + // Multi-stage /** * Time spent building the hash table for the join. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index fe1b348a28..d1900f6a8f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -310,4 +310,8 @@ public class QueryOptionsUtils { public static boolean isSkipUnavailableServers(Map<String, String> queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS)); } + + public static boolean isSecondaryWorkload(Map<String, String> queryOptions) { + return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD)); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java new file mode 100644 index 0000000000..93e84c1e6a --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerQueryPhase; +import org.apache.pinot.common.metrics.ServerTimer; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager; +import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This scheduler is designed to deal with two types of workloads + * 1. Primary Workloads -> regular queries from the application + * 2. Secondary Workloads -> adhoc queries fired from tools, testing, etc + * + * + * Primary Workload Queries + * Primary workloads queries are executed with priority and submitted to the Runner threads as and when they arrive. + * The resources used by a primary workload query is not capped. + * + * Secondary Workload Queries + * - Secondary workload queries are identified using a query option -> "SET isSecondaryWorkload=true" + * - Secondary workload queries are contained as follows: + * - Restrictions on number of runner threads available to process secondary queries + * - Restrictions on total number of worker threads available to process a single secondary query + * - Restrictions on total number of worker threads available to process all in-progress secondary queries + */ +public class BinaryWorkloadScheduler extends QueryScheduler { + private static final Logger LOGGER = LoggerFactory.getLogger(BinaryWorkloadScheduler.class); + + public static final String MAX_SECONDARY_QUERIES = "binarywlm.maxSecondaryRunnerThreads"; + public static final int DEFAULT_MAX_SECONDARY_QUERIES = 5; + + // Secondary Workload Runners. + private final int _numSecondaryRunners; + private final Semaphore _secondaryRunnerSemaphore; + + private final SecondaryWorkloadQueue _secondaryQueryQ; + + Thread _scheduler; + + public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics metrics, + LongAccumulator latestQueryTime) { + super(config, queryExecutor, new BinaryWorkloadResourceManager(config), metrics, latestQueryTime); + + _secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager); + _numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES, DEFAULT_MAX_SECONDARY_QUERIES); + LOGGER.info("numSecondaryRunners={}", _numSecondaryRunners); + _secondaryRunnerSemaphore = new Semaphore(_numSecondaryRunners); + } + + @Override + public String name() { + return "BinaryWorkloadScheduler"; + } + + @Override + public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) { + if (!_isRunning) { + return immediateErrorResponse(queryRequest, QueryException.SERVER_SCHEDULER_DOWN_ERROR); + } + + queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT); + if (!QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions())) { + QueryExecutorService queryExecutorService = _resourceManager.getExecutorService(queryRequest, null); + ListenableFutureTask<byte[]> queryTask = createQueryFutureTask(queryRequest, queryExecutorService); + _resourceManager.getQueryRunners().submit(queryTask); + return queryTask; + } + + final SchedulerQueryContext schedQueryContext = new SchedulerQueryContext(queryRequest); + try { + // Update metrics + _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(), ServerMeter.NUM_SECONDARY_QUERIES, 1L); + + _secondaryQueryQ.put(schedQueryContext); + } catch (OutOfCapacityException e) { + LOGGER.error("Out of capacity for query {} table {}, message: {}", queryRequest.getRequestId(), + queryRequest.getTableNameWithType(), e.getMessage()); + return immediateErrorResponse(queryRequest, QueryException.SERVER_OUT_OF_CAPACITY_ERROR); + } catch (Exception e) { + // We should not throw any other exception other than OutOfCapacityException. Signal that there's an issue with + // the scheduler if any other exception is thrown. + LOGGER.error("Internal error for query {} table {}, message {}", queryRequest.getRequestId(), + queryRequest.getTableNameWithType(), e.getMessage()); + return immediateErrorResponse(queryRequest, QueryException.SERVER_SCHEDULER_DOWN_ERROR); + } + return schedQueryContext.getResultFuture(); + } + + @Override + public void start() { + super.start(); + _scheduler = getScheduler(); + _scheduler.setName("scheduler"); + // TODO: Considering setting a lower priority to avoid busy loop when all threads are busy processing queries. + _scheduler.setPriority(Thread.MAX_PRIORITY); + _scheduler.setDaemon(true); + _scheduler.start(); + } + + private Thread getScheduler() { + return new Thread(new Runnable() { + @Override + public void run() { + while (_isRunning) { + try { + _secondaryRunnerSemaphore.acquire(); + } catch (InterruptedException e) { + if (!_isRunning) { + LOGGER.info("Shutting down scheduler"); + } else { + LOGGER.error("Interrupt while acquiring semaphore. Exiting.", e); + } + break; + } + try { + final SchedulerQueryContext request = _secondaryQueryQ.take(); + if (request == null) { + continue; + } + ServerQueryRequest queryRequest = request.getQueryRequest(); + final QueryExecutorService executor = + _resourceManager.getExecutorService(queryRequest, request.getSchedulerGroup()); + final ListenableFutureTask<byte[]> queryFutureTask = createQueryFutureTask(queryRequest, executor); + queryFutureTask.addListener(new Runnable() { + @Override + public void run() { + executor.releaseWorkers(); + request.getSchedulerGroup().endQuery(); + _secondaryRunnerSemaphore.release(); + checkStopResourceManager(); + } + }, MoreExecutors.directExecutor()); + + // Update metrics + updateSecondaryWorkloadMetrics(queryRequest); + + request.setResultFuture(queryFutureTask); + request.getSchedulerGroup().startQuery(); + _resourceManager.getQueryRunners().submit(queryFutureTask); + } catch (Throwable t) { + LOGGER.error( + "Error in scheduler thread. This is indicative of a bug. Please report this. Server will continue " + + "with errors", t); + } + } + if (_isRunning) { + throw new RuntimeException("FATAL: Scheduler thread is quitting.....something went horribly wrong.....!!!"); + } else { + failAllPendingQueries(); + } + } + }); + } + + private void updateSecondaryWorkloadMetrics(ServerQueryRequest queryRequest) { + long timeInQMs = System.currentTimeMillis() - queryRequest.getTimerContext().getQueryArrivalTimeMs(); + _serverMetrics.addTimedTableValue(queryRequest.getTableNameWithType(), ServerTimer.SECONDARY_Q_WAIT_TIME_MS, + timeInQMs, TimeUnit.MILLISECONDS); + _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(), + ServerMeter.NUM_SECONDARY_QUERIES_SCHEDULED, 1L); + } + + @Override + public void stop() { + super.stop(); + // without this, scheduler will never stop if there are no pending queries + if (_scheduler != null) { + _scheduler.interrupt(); + } + } + + private void checkStopResourceManager() { + if (!_isRunning && _secondaryRunnerSemaphore.availablePermits() == _numSecondaryRunners) { + _resourceManager.stop(); + } + } + + synchronized private void failAllPendingQueries() { + List<SchedulerQueryContext> pending = _secondaryQueryQ.drain(); + for (SchedulerQueryContext queryContext : pending) { + queryContext.setResultFuture( + immediateErrorResponse(queryContext.getQueryRequest(), QueryException.SERVER_SCHEDULER_DOWN_ERROR)); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java index 86c7170d93..69dd8bed36 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java @@ -45,6 +45,7 @@ public class QuerySchedulerFactory { public static final String FCFS_ALGORITHM = "fcfs"; public static final String TOKEN_BUCKET_ALGORITHM = "tokenbucket"; public static final String BOUNDED_FCFS_ALGORITHM = "bounded_fcfs"; + public static final String BINARY_WORKLOAD_ALGORITHM = "binary_workload"; public static final String ALGORITHM_NAME_CONFIG_KEY = "name"; public static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = FCFS_ALGORITHM; @@ -73,6 +74,9 @@ public class QuerySchedulerFactory { case BOUNDED_FCFS_ALGORITHM: scheduler = BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime); break; + case BINARY_WORKLOAD_ALGORITHM: + scheduler = new BinaryWorkloadScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime); + break; default: scheduler = getQuerySchedulerByClassName(schedulerName, schedulerConfig, queryExecutor, serverMetrics, latestQueryTime); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java new file mode 100644 index 0000000000..82736dbf0f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler; + + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.fcfs.FCFSSchedulerGroup; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Queue to maintain secondary workload queries. Used by the BinaryWorkloadScheduler. + */ +public class SecondaryWorkloadQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(SecondaryWorkloadQueue.class); + private static final String SECONDARY_WORKLOAD_GROUP_NAME = "Secondary"; + + public static final String SECONDARY_QUEUE_QUERY_TIMEOUT = "binarywlm.secondaryQueueQueryTimeout"; + private static final int DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC = 40; + + public static final String MAX_PENDING_SECONDARY_QUERIES = "binarywlm.maxPendingSecondaryQueries"; + private static final int DEFAULT_MAX_PENDING_SECONDARY_QUERIES = 20; + + public static final String QUEUE_WAKEUP_MS = "binarywlm.queueWakeupMs"; + private static final int DEFAULT_WAKEUP_MS = 1; + + private static int _wakeUpTimeMs; + private final int _maxPendingPerGroup; + + private final SchedulerGroup _schedulerGroup; + + private final Lock _queueLock = new ReentrantLock(); + private final Condition _queryReaderCondition = _queueLock.newCondition(); + private final ResourceManager _resourceManager; + private final int _queryDeadlineMs; + + public SecondaryWorkloadQueue(PinotConfiguration config, ResourceManager resourceManager) { + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(resourceManager); + + _queryDeadlineMs = + config.getProperty(SECONDARY_QUEUE_QUERY_TIMEOUT, DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC) * 1000; + _wakeUpTimeMs = config.getProperty(QUEUE_WAKEUP_MS, DEFAULT_WAKEUP_MS); + _maxPendingPerGroup = config.getProperty(MAX_PENDING_SECONDARY_QUERIES, DEFAULT_MAX_PENDING_SECONDARY_QUERIES); + LOGGER.info("queryDeadlineMs={}, wakeupTimeMs={},maxPendingPerGroup={}", _queryDeadlineMs, _wakeUpTimeMs, + _maxPendingPerGroup); + _schedulerGroup = new FCFSSchedulerGroup(SECONDARY_WORKLOAD_GROUP_NAME); + _resourceManager = resourceManager; + } + + /** + * Adds a query to the secondary workload queue. + * @param query + * @throws OutOfCapacityException + */ + public void put(SchedulerQueryContext query) + throws OutOfCapacityException { + Preconditions.checkNotNull(query); + _queueLock.lock(); + try { + checkSchedulerGroupCapacity(query); + query.setSchedulerGroupContext(_schedulerGroup); + _schedulerGroup.addLast(query); + _queryReaderCondition.signal(); + } finally { + _queueLock.unlock(); + } + } + + /** + * Blocking call to read the next query + * @return + */ + @Nullable + public SchedulerQueryContext take() { + _queueLock.lock(); + try { + while (true) { + SchedulerQueryContext schedulerQueryContext; + while ((schedulerQueryContext = takeNextInternal()) == null) { + try { + _queryReaderCondition.await(_wakeUpTimeMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; + } + } + return schedulerQueryContext; + } + } finally { + _queueLock.unlock(); + } + } + + public List<SchedulerQueryContext> drain() { + List<SchedulerQueryContext> pending = new ArrayList<>(); + _queueLock.lock(); + try { + while (!_schedulerGroup.isEmpty()) { + pending.add(_schedulerGroup.removeFirst()); + } + } finally { + _queueLock.unlock(); + } + return pending; + } + + private SchedulerQueryContext takeNextInternal() { + long startTimeMs = System.currentTimeMillis(); + long deadlineEpochMillis = startTimeMs - _queryDeadlineMs; + + _schedulerGroup.trimExpired(deadlineEpochMillis); + if (_schedulerGroup.isEmpty() || !_resourceManager.canSchedule(_schedulerGroup)) { + return null; + } + + if (LOGGER.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("SchedulerInfo:"); + sb.append(_schedulerGroup.toString()); + ServerQueryRequest queryRequest = _schedulerGroup.peekFirst().getQueryRequest(); + sb.append(String.format(" Group: %s: [%d,%d,%d,%d]", _schedulerGroup.name(), + queryRequest.getTimerContext().getQueryArrivalTimeMs(), queryRequest.getRequestId(), + queryRequest.getSegmentsToQuery().size(), startTimeMs)); + LOGGER.debug(sb.toString()); + } + + SchedulerQueryContext query = _schedulerGroup.removeFirst(); + return query; + } + + private void checkSchedulerGroupCapacity(SchedulerQueryContext query) + throws OutOfCapacityException { + if (_schedulerGroup.numPending() >= _maxPendingPerGroup + && _schedulerGroup.totalReservedThreads() >= _resourceManager.getTableThreadsHardLimit()) { + throw new OutOfCapacityException(String.format( + "SchedulerGroup %s is out of capacity. numPending: %d, maxPending: %d, reservedThreads: %d " + + "threadsHardLimit: %d", _schedulerGroup.name(), _schedulerGroup.numPending(), _maxPendingPerGroup, + _schedulerGroup.totalReservedThreads(), _resourceManager.getTableThreadsHardLimit())); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java new file mode 100644 index 0000000000..fb7ffd93e3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler.resources; + +import com.google.common.base.Preconditions; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * ResourceManager for BinaryWorkloadScheduler. + */ +public class BinaryWorkloadResourceManager extends ResourceManager { + private static final Logger LOGGER = LoggerFactory.getLogger(BinaryWorkloadResourceManager.class); + private final ResourceLimitPolicy _secondaryWorkloadPolicy; + + public BinaryWorkloadResourceManager(PinotConfiguration config) { + super(config); + _secondaryWorkloadPolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads); + } + + /** + * Returns an executor service that query executor can use like a dedicated + * service for submitting jobs for parallel execution. + * @param query + * @param accountant Accountant for a scheduler group + * @return UnboundedExecutorService for primary workload queries. For secondary workload queries, returns a + * BoundedAccountingExecutor service that limits the number of threads available for query execution. Query + * execution can submit tasks for parallel execution without need + * for limiting their parallelism. + */ + @Override + public QueryExecutorService getExecutorService(ServerQueryRequest query, SchedulerGroupAccountant accountant) { + if (!QueryOptionsUtils.isSecondaryWorkload(query.getQueryContext().getQueryOptions())) { + return getPrimaryWorkloadExecutorService(); + } + + return getSecondaryWorkloadExecutorService(query, accountant); + } + + @Override + public int getTableThreadsHardLimit() { + return _secondaryWorkloadPolicy.getTableThreadsHardLimit(); + } + + @Override + public int getTableThreadsSoftLimit() { + return _secondaryWorkloadPolicy.getTableThreadsSoftLimit(); + } + + private QueryExecutorService getPrimaryWorkloadExecutorService() { + return new QueryExecutorService() { + @Override + public void execute(Runnable command) { + _queryWorkers.submit(command); + } + }; + } + + private QueryExecutorService getSecondaryWorkloadExecutorService(ServerQueryRequest query, + SchedulerGroupAccountant accountant) { + int numSegments = query.getSegmentsToQuery().size(); + int queryThreadLimit = Math.max(1, Math.min(_secondaryWorkloadPolicy.getMaxThreadsPerQuery(), numSegments)); + int spareThreads = _secondaryWorkloadPolicy.getTableThreadsHardLimit() - accountant.totalReservedThreads(); + if (spareThreads <= 0) { + LOGGER.warn("UNEXPECTED: Attempt to schedule query uses more than the configured hard limit on threads"); + spareThreads = 1; + } else { + spareThreads = Math.min(spareThreads, queryThreadLimit); + } + Preconditions.checkState(spareThreads >= 1); + // We do not bound number of threads here by total available threads. We can potentially + // over-provision number of threads here. That is intentional and (potentially) good solution. + // Queries don't use their workers all the time. So, reserving workers leads to suboptimal resource + // utilization. We want to keep the pipe as full as possible for query workers. Overprovisioning is one + // way to achieve that (in fact, only way for us). There is a couter-argument to be made that overprovisioning + // can impact cache-lines and memory in general. + // We use this thread reservation only to determine priority based on resource utilization and not as a way to + // improve system performance (because we don't have good insight on that yet) + accountant.addReservedThreads(spareThreads); + // TODO: For 1 thread we should have the query run in the same queryRunner thread + // by supplying an executor service that similar to Guava' directExecutor() + return new BoundedAccountingExecutor(_queryWorkers, spareThreads, accountant); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java index a495c6c5d7..9c4c59ea1e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java @@ -55,6 +55,11 @@ public class QuerySchedulerFactoryTest { queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime); assertTrue(queryScheduler instanceof BoundedFCFSScheduler); + config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, + QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM); + queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime); + assertTrue(queryScheduler instanceof BinaryWorkloadScheduler); + config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, TestQueryScheduler.class.getName()); queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime); assertTrue(queryScheduler instanceof TestQueryScheduler); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 2bfc61e7f4..21e5c87ee7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -428,6 +428,14 @@ public class CommonConstants { // If query submission causes an exception, still continue to submit the query to other servers public static final String SKIP_UNAVAILABLE_SERVERS = "skipUnavailableServers"; + + // Indicates that a query belongs to a secondary workload when using the BinaryWorkloadScheduler. The + // BinaryWorkloadScheduler divides queries into two workloads, primary and secondary. Primary workloads are + // executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS + // fashion with limited compute.des queries into two workloads, primary and secondary. Primary workloads are + // executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS + // fashion with limited compute. + public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload"; } public static class QueryOptionValue { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org