This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 397874e548 Add Debug APIs that return Thread and Query Resource Usage (#13583) 397874e548 is described below commit 397874e548a8ba06ff95a0777c10909393f400c2 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Mon Jul 22 19:43:26 2024 +0530 Add Debug APIs that return Thread and Query Resource Usage (#13583) * Checkpoint * Checkpoint * Add javadocs for new interfaces. * Code linting and java docs. * Add two quick starts for resource tracking. * Add a javadoc * Dont serialize thread info * Add auth to query debug resource --- .../broker/api/resources/PinotBrokerDebug.java | 26 ++++++ .../CPUMemThreadLevelAccountingObjects.java | 23 +++++- .../PerQueryCPUMemAccountantFactory.java | 85 ++++++++++++++++++-- .../java/org/apache/pinot/core/auth/Actions.java | 1 + .../pinot/server/api/resources/DebugResource.java | 26 ++++++ .../pinot/spi/accounting/QueryResourceTracker.java | 46 +++++++++++ .../spi/accounting/ThreadResourceTracker.java | 52 ++++++++++++ .../accounting/ThreadResourceUsageAccountant.java | 14 ++++ .../java/org/apache/pinot/spi/trace/Tracing.java | 15 ++++ .../tools/MultiStageResourceTrackerQuickStart.java | 56 +++++++++++++ .../pinot/tools/MultistageEngineQuickStart.java | 24 +++--- .../SingleStageResourceTrackingQuickStart.java | 93 ++++++++++++++++++++++ .../apache/pinot/tools/utils/PinotConfigUtils.java | 22 +++++ .../apache/pinot/tools/utils/SampleQueries.java | 42 ++++++++++ 14 files changed, 503 insertions(+), 22 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index 11e35eb6f7..8311b5de87 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -27,6 +27,7 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,7 +58,11 @@ import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; +import org.apache.pinot.spi.accounting.QueryResourceTracker; +import org.apache.pinot.spi.accounting.ThreadResourceTracker; +import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; @@ -269,4 +274,25 @@ public class PinotBrokerDebug { private long getRequestId() { return _requestIdGenerator.getAndIncrement(); } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/debug/threads/resourceUsage") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DEBUG_RESOURCE_USAGE) + @ApiOperation(value = "Get resource usage of threads") + public Collection<? extends ThreadResourceTracker> getThreadResourceUsage() { + ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant(); + return threadAccountant.getThreadResources(); + } + + @GET + @Path("debug/queries/resourceUsage") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.DEBUG_RESOURCE_USAGE) + @ApiOperation(value = "Get current resource usage of queries in this service", notes = "This is a debug endpoint, " + + "and won't maintain backward compatibility") + public Collection<? extends QueryResourceTracker> getQueryUsage() { + ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant(); + return threadAccountant.getQueryResources().values(); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java index 431643942a..6a375b95bb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.core.accounting; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.utils.CommonConstants; @@ -34,7 +36,7 @@ public class CPUMemThreadLevelAccountingObjects { * Entry to track the task execution status and usage stats of a Thread * (including but not limited to server worker thread, runner thread, broker jetty thread, or broker netty thread) */ - public static class ThreadEntry { + public static class ThreadEntry implements ThreadResourceTracker { // current query_id, task_id of the thread; this field is accessed by the thread itself and the accountant AtomicReference<TaskEntry> _currentThreadTaskStatus = new AtomicReference<>(); // current sample of thread memory usage/cputime ; this field is accessed by the thread itself and the accountant @@ -77,11 +79,30 @@ public class CPUMemThreadLevelAccountingObjects { * * @return the current query id on the thread, {@code null} if idle */ + @JsonIgnore @Nullable public TaskEntry getCurrentThreadTaskStatus() { return _currentThreadTaskStatus.get(); } + public long getCPUTimeMS() { + return _currentThreadCPUTimeSampleMS; + } + + public long getAllocatedBytes() { + return _currentThreadMemoryAllocationSampleBytes; + } + + public String getQueryId() { + TaskEntry taskEntry = _currentThreadTaskStatus.get(); + return taskEntry == null ? "" : taskEntry.getQueryId(); + } + + public int getTaskId() { + TaskEntry taskEntry = _currentThreadTaskStatus.get(); + return taskEntry == null ? -1 : taskEntry.getTaskId(); + } + public void setThreadTaskStatus(@Nonnull String queryId, int taskId, @Nonnull Thread anchorThread) { _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, anchorThread)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index 598b68b344..34e496d3bf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.core.accounting; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -38,8 +40,10 @@ import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.config.instance.InstanceType; @@ -162,6 +166,65 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory _watcherTask = new WatcherTask(); } + @Override + public Collection<? extends ThreadResourceTracker> getThreadResources() { + return _threadEntriesMap.values(); + } + + /** + * This function aggregates resource usage from all active threads and groups by queryId. + * It is inspired by {@link PerQueryCPUMemResourceUsageAccountant::aggregate}. The major difference is that + * it only reads from thread entries and does not update them. + * @return A map of query id, QueryResourceTracker. + */ + @Override + public Map<String, ? extends QueryResourceTracker> getQueryResources() { + HashMap<String, AggregatedStats> ret = new HashMap<>(); + + // for each {pqr, pqw} + for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { + // sample current usage + CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue(); + long currentCPUSample = _isThreadCPUSamplingEnabled ? threadEntry._currentThreadCPUTimeSampleMS : 0; + long currentMemSample = + _isThreadMemorySamplingEnabled ? threadEntry._currentThreadMemoryAllocationSampleBytes : 0; + // sample current running task status + CPUMemThreadLevelAccountingObjects.TaskEntry currentTaskStatus = threadEntry.getCurrentThreadTaskStatus(); + Thread thread = entry.getKey(); + LOGGER.trace("tid: {}, task: {}", thread.getId(), currentTaskStatus); + + // if current thread is not idle + if (currentTaskStatus != null) { + // extract query id from queryTask string + String queryId = currentTaskStatus.getQueryId(); + if (queryId != null) { + Thread anchorThread = currentTaskStatus.getAnchorThread(); + boolean isAnchorThread = currentTaskStatus.isAnchorThread(); + ret.compute(queryId, + (k, v) -> v == null ? new AggregatedStats(currentCPUSample, currentMemSample, anchorThread, + isAnchorThread, threadEntry._errorStatus, queryId) + : v.merge(currentCPUSample, currentMemSample, isAnchorThread, threadEntry._errorStatus)); + } + } + } + + // if triggered, accumulate stats of finished tasks of each active query + for (Map.Entry<String, AggregatedStats> queryIdResult : ret.entrySet()) { + String activeQueryId = queryIdResult.getKey(); + long accumulatedCPUValue = + _isThreadCPUSamplingEnabled ? _finishedTaskCPUStatsAggregator.getOrDefault(activeQueryId, 0L) : 0; + long concurrentCPUValue = + _isThreadCPUSamplingEnabled ? _concurrentTaskCPUStatsAggregator.getOrDefault(activeQueryId, 0L) : 0; + long accumulatedMemValue = + _isThreadMemorySamplingEnabled ? _finishedTaskMemStatsAggregator.getOrDefault(activeQueryId, 0L) : 0; + long concurrentMemValue = + _isThreadMemorySamplingEnabled ? _concurrentTaskMemStatsAggregator.getOrDefault(activeQueryId, 0L) : 0; + queryIdResult.getValue() + .merge(accumulatedCPUValue + concurrentCPUValue, accumulatedMemValue + concurrentMemValue, false, null); + } + return ret; + } + @Override public void sampleUsage() { sampleThreadBytesAllocated(); @@ -402,7 +465,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory /** * aggregated usage of a query, _thread is the runner */ - protected static class AggregatedStats { + protected static class AggregatedStats implements QueryResourceTracker { final String _queryId; final Thread _anchorThread; boolean _isAnchorThread; @@ -432,14 +495,22 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory + '}'; } - public long getCpuNS() { - return _cpuNS; + @Override + public String getQueryId() { + return _queryId; } + @Override public long getAllocatedBytes() { return _allocatedBytes; } + @Override + public long getCpuTimeNs() { + return _cpuNS; + } + + @JsonIgnore public Thread getAnchorThread() { return _anchorThread; } @@ -804,7 +875,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } } else { maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(), - Comparator.comparing(AggregatedStats::getCpuNS)); + Comparator.comparing(AggregatedStats::getCpuTimeNs)); if (_oomKillQueryEnabled) { maxUsageTuple._exceptionAtomicReference .set(new RuntimeException(String.format( @@ -828,12 +899,12 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory AggregatedStats value = entry.getValue(); if (value._cpuNS > _cpuTimeBasedKillingThresholdNS) { LOGGER.error("Current task status recorded is {}. Query {} got picked because using {} ns of cpu time," - + " greater than threshold {}", _threadEntriesMap, value._queryId, value.getCpuNS(), + + " greater than threshold {}", _threadEntriesMap, value._queryId, value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS); value._exceptionAtomicReference.set(new RuntimeException( String.format("Query %s got killed on %s: %s because using %d " - + "CPU time exceeding limit of %d ns CPU time", - value._queryId, _instanceType, _instanceId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS))); + + "CPU time exceeding limit of %d ns CPU time", value._queryId, _instanceType, _instanceId, + value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS))); interruptRunnerThread(value.getAnchorThread()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 877041eb7d..51cab16711 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -34,6 +34,7 @@ public class Actions { public static final String CREATE_TENANT = "CreateTenant"; public static final String CREATE_USER = "CreateUser"; public static final String DEBUG_TASK = "DebugTask"; + public static final String DEBUG_RESOURCE_USAGE = "DebugResourceUsage"; public static final String DELETE_CLUSTER_CONFIG = "DeleteClusterConfig"; public static final String DELETE_INSTANCE = "DeleteInstance"; public static final String DELETE_TASK = "DeleteTask"; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java index b90f4b42a4..23f7d9a6cc 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java @@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,8 +53,12 @@ import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.accounting.QueryResourceTracker; +import org.apache.pinot.spi.accounting.ThreadResourceTracker; +import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.stream.ConsumerPartitionState; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import static org.apache.pinot.spi.utils.CommonConstants.DATABASE; @@ -123,6 +128,27 @@ public class DebugResource { } } + @GET + @Path("threads/resourceUsage") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get current resource usage of threads", + notes = "This is a debug endpoint, and won't maintain backward compatibility") + public Collection<? extends ThreadResourceTracker> getThreadUsage() { + ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant(); + return threadAccountant.getThreadResources(); + } + + @GET + @Path("queries/resourceUsage") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get current resource usage of queries in this service", + notes = "This is a debug endpoint, and won't maintain backward compatibility") + public Collection<? extends QueryResourceTracker> getQueryUsage() { + ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant(); + Collection<? extends QueryResourceTracker> resources = threadAccountant.getQueryResources().values(); + return resources; + } + private List<SegmentServerDebugInfo> getSegmentServerDebugInfo(String tableNameWithType, TableType tableType) { List<SegmentServerDebugInfo> segmentServerDebugInfos = new ArrayList<>(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java new file mode 100644 index 0000000000..869528bb75 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.accounting; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + + +/** + * Tracks allocated bytes and CPU time for a query in a server or a broker. + */ +@JsonSerialize +public interface QueryResourceTracker { + /** + * QueryId tracked by the implementation. + * @return a string containing the query id. + */ + String getQueryId(); + + /** + * Allocated bytes for a query in a server or broker + * @return A long containing the number of bytes allocated to execute the query. + */ + long getAllocatedBytes(); + + /** + * Total execution CPU Time(nanoseconds) of a query in a server or broker. + * @return A long containing the nanoseconds. + */ + long getCpuTimeNs(); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java new file mode 100644 index 0000000000..ff78a0d33c --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.accounting; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + + +/** + * Tracks allocated bytes and CPU time by a thread when executing a task of a query. + */ +@JsonSerialize +public interface ThreadResourceTracker { + /** + * Total execution CPU Time(nanoseconds) of a thread when executing a query task in a server or broker. + * @return A long containing the nanoseconds. + */ + long getCPUTimeMS(); + + /** + * Allocated bytes for a query task in a server or broker + * @return A long containing the number of bytes allocated to execute the query task. + */ + long getAllocatedBytes(); + + /** + * QueryId of the task the thread is executing. + * @return a string containing the query id. + */ + String getQueryId(); + + /** + * TaskId of the task the thread is executing. + * @return an int containing the task id. + */ + int getTaskId(); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index 41bbd59110..8be0632e6b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.spi.accounting; +import java.util.Collection; +import java.util.Map; import javax.annotation.Nullable; @@ -74,4 +76,16 @@ public interface ThreadResourceUsageAccountant { * @return empty string if N/A */ Exception getErrorStatus(); + + /** + * Get all the ThreadResourceTrackers for all threads executing query tasks + * @return A collection of ThreadResourceTracker objects + */ + Collection<? extends ThreadResourceTracker> getThreadResources(); + + /** + * Get all the QueryResourceTrackers for all the queries executing in a broker or server. + * @return A Map of String, QueryResourceTracker for all the queries. + */ + Map<String, ? extends QueryResourceTracker> getQueryResources(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index a853b03c20..910ebc35cf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -20,9 +20,14 @@ package org.apache.pinot.spi.trace; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.env.PinotConfiguration; @@ -226,6 +231,16 @@ public class Tracing { public Exception getErrorStatus() { return null; } + + @Override + public Collection<? extends ThreadResourceTracker> getThreadResources() { + return Collections.emptyList(); + } + + @Override + public Map<String, ? extends QueryResourceTracker> getQueryResources() { + return Collections.emptyMap(); + } } /** diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java new file mode 100644 index 0000000000..a669011a18 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.tools.admin.PinotAdministrator; +import org.apache.pinot.tools.utils.SampleQueries; + + +public class MultiStageResourceTrackerQuickStart extends SingleStageResourceTrackingQuickStart { + @Override + protected List<String> getQueries() { + return List.of(SampleQueries.COUNT_BASEBALL_STATS, SampleQueries.BASEBALL_STATS_SELF_JOIN, + SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS); + } + + @Override + protected Map<String, String> getQueryOptions() { + return Collections.singletonMap("queryOptions", + CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE + "=true"); + } + + @Override + public List<String> types() { + return Collections.singletonList("MULTI_STAGE_RESOURCE_TRACKING"); + } + + public static void main(String[] args) + throws Exception { + List<String> arguments = new ArrayList<>(); + arguments.addAll(Arrays.asList("QuickStart", "-type", "MULTI_STAGE_RESOURCE_TRACKING")); + arguments.addAll(Arrays.asList(args)); + PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java index 658aa694dc..e249c37c0b 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.tools.admin.PinotAdministrator; import org.apache.pinot.tools.admin.command.QuickstartRunner; +import org.apache.pinot.tools.utils.SampleQueries; public class MultistageEngineQuickStart extends Quickstart { @@ -58,28 +59,23 @@ public class MultistageEngineQuickStart extends Quickstart { printStatus(Quickstart.Color.YELLOW, "***** Multi-stage engine quickstart setup complete *****"); Map<String, String> queryOptions = Collections.singletonMap("queryOptions", CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE + "=true"); - String q1 = "SELECT count(*) FROM baseballStats_OFFLINE LIMIT 10"; printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table"); - printStatus(Quickstart.Color.CYAN, "Query : " + q1); - printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q1, queryOptions))); + printStatus(Quickstart.Color.CYAN, "Query : " + SampleQueries.COUNT_BASEBALL_STATS); + printStatus(Quickstart.Color.YELLOW, + prettyPrintResponse(runner.runQuery(SampleQueries.COUNT_BASEBALL_STATS, queryOptions))); printStatus(Quickstart.Color.GREEN, "***************************************************"); - String q2 = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID" - + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON a.playerID = b.playerID" - + " WHERE a.runs > 160 AND b.runs < 2 LIMIT 10"; printStatus(Quickstart.Color.YELLOW, "Correlate the same player(s) with more than 160-run some year(s) and" + " with less than 2-run some other year(s)"); - printStatus(Quickstart.Color.CYAN, "Query : " + q2); - printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q2, queryOptions))); + printStatus(Quickstart.Color.CYAN, "Query : " + SampleQueries.BASEBALL_STATS_SELF_JOIN); + printStatus(Quickstart.Color.YELLOW, + prettyPrintResponse(runner.runQuery(SampleQueries.BASEBALL_STATS_SELF_JOIN, queryOptions))); printStatus(Quickstart.Color.GREEN, "***************************************************"); - String q3 = "SELECT a.playerName, a.teamID, b.teamName \n" - + "FROM baseballStats_OFFLINE AS a\n" - + "JOIN dimBaseballTeams_OFFLINE AS b\n" - + "ON a.teamID = b.teamID LIMIT 10"; printStatus(Quickstart.Color.YELLOW, "Baseball Stats with joined team names"); - printStatus(Quickstart.Color.CYAN, "Query : " + q3); - printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q3, queryOptions))); + printStatus(Quickstart.Color.CYAN, "Query : " + SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS); + printStatus(Quickstart.Color.YELLOW, + prettyPrintResponse(runner.runQuery(SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS, queryOptions))); printStatus(Quickstart.Color.GREEN, "***************************************************"); String q4 = diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java new file mode 100644 index 0000000000..8fbc4fbdf6 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.pinot.tools.admin.PinotAdministrator; +import org.apache.pinot.tools.admin.command.QuickstartRunner; +import org.apache.pinot.tools.utils.PinotConfigUtils; +import org.apache.pinot.tools.utils.SampleQueries; + +public class SingleStageResourceTrackingQuickStart extends Quickstart { + @Override + protected Map<String, Object> getConfigOverrides() { + Map<String, Object> configOverrides = new HashMap<>(); + // Quickstart.getConfigOverrides may return an ImmutableMap. + configOverrides.putAll(super.getConfigOverrides()); + configOverrides.putAll(PinotConfigUtils.getResourceTrackingConf()); + return configOverrides; + } + + @Override + public void runSampleQueries(QuickstartRunner runner) + throws Exception { + List<String> queries = getQueries(); + Map<String, String> queryOptions = getQueryOptions(); + + printStatus(Color.YELLOW, "***** Running queries for eternity *****"); + ExecutorService service = Executors.newFixedThreadPool(10); + + for (int i = 0; i < 10; i++) { + service.submit(() -> { + try { + while (true) { + for (String query : queries) { + runner.runQuery(query, queryOptions); + } + Thread.sleep(10); + } + } catch (Exception e) { + printStatus(Color.CYAN, e.getMessage()); + } + }); + } + } + + protected List<String> getQueries() { + return List.of(SampleQueries.COUNT_BASEBALL_STATS, + SampleQueries.BASEBALL_SUM_RUNS_Q1, + SampleQueries.BASEBALL_SUM_RUNS_Q2, + SampleQueries.BASEBALL_SUM_RUNS_Q3, + SampleQueries.BASEBALL_ORDER_BY_YEAR); + } + + protected Map<String, String> getQueryOptions() { + return Collections.emptyMap(); + } + + @Override + public List<String> types() { + return Collections.singletonList("SINGLE_STAGE_RESOURCE_TRACKING"); + } + + public static void main(String[] args) + throws Exception { + List<String> arguments = new ArrayList<>(); + arguments.addAll(Arrays.asList("QuickStart", "-type", "SINGLE_STAGE_RESOURCE_TRACKING")); + arguments.addAll(Arrays.asList(args)); + PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java index 77d0eee664..0a9e1cd31e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf; +import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.spi.env.CommonsConfigurationUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; @@ -215,6 +216,27 @@ public class PinotConfigUtils { return properties; } + public static Map<String, Object> getResourceTrackingConf() { + Map<String, Object> configOverrides = new HashMap<>(); + configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); + configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, true); + configOverrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); + configOverrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, true); + configOverrides.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, true); + configOverrides.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true); + + configOverrides.put( + CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME, + PerQueryCPUMemAccountantFactory.class.getCanonicalName()); + configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true); + configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false); + configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true); + return configOverrides; + } + public static int getAvailablePort() { try { try (ServerSocket socket = new ServerSocket(0)) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java new file mode 100644 index 0000000000..8247de78b7 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools.utils; + +public class SampleQueries { + private SampleQueries() { + } + + public final static String COUNT_BASEBALL_STATS = "SELECT count(*) FROM baseballStats_OFFLINE LIMIT 10"; + public final static String BASEBALL_STATS_SELF_JOIN = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID" + + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON a.playerID = b.playerID" + + " WHERE a.runs > 160 AND b.runs < 2 LIMIT 10"; + public final static String BASEBALL_JOIN_DIM_BASEBALL_TEAMS = + "SELECT a.playerName, a.teamID, b.teamName \n" + "FROM baseballStats_OFFLINE AS a\n" + + "JOIN dimBaseballTeams_OFFLINE AS b\n" + "ON a.teamID = b.teamID LIMIT 10"; + public final static String BASEBALL_SUM_RUNS_Q1 = + "select playerName, sum(runs) from baseballStats group by playerName order by sum(runs) desc limit 5"; + public final static String BASEBALL_SUM_RUNS_Q2 = + "select playerName, sum(runs) from baseballStats where yearID=2000 group by playerName order by sum(runs) " + + "desc limit 5"; + public final static String BASEBALL_SUM_RUNS_Q3 = + "select playerName, sum(runs) from baseballStats where yearID>=2000 group by playerName order by sum(runs) " + + "desc limit 10"; + public final static String BASEBALL_ORDER_BY_YEAR = + "select playerName, runs, homeRuns from baseballStats order by yearID limit 10"; +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org