This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b5afb89074 Make ThreadResourceUsageProvider a Helper/Utility Class. (#16051) b5afb89074 is described below commit b5afb890741909c84e327c1d5a088c685502373d Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Fri Jun 20 11:04:29 2025 +0530 Make ThreadResourceUsageProvider a Helper/Utility Class. (#16051) * ThreadResourceUsageProvider is a helper class. ThreadResourceContext tracks resource usage. Fix updateConcurrently * Rename to ThreadResourceSnapshot * Clean up * Add javadoc * Done use auto closeable * Checkstyle * Fix compilation error * Add back removed functions in SPI * Remove private constructor because japicmp complains. * Add setThreadResourceUsageProvider because of backward-incompatible checks * Add setThreadResourceUsageProvider because of backward-incompatible checks * Fix test * Fix ThreadResourceSnapshot usage and tests * Store cpu sample in nanoseconds. --- .../pinot/common/datatable/DataTableImplV4.java | 10 ++-- .../CPUMemThreadLevelAccountingObjects.java | 18 +++++++ .../PerQueryCPUMemAccountantFactory.java | 42 ++++++--------- .../core/operator/InstanceResponseOperator.java | 9 ++-- .../core/operator/combine/BaseCombineOperator.java | 9 ++-- .../pinot/core/transport/DataTableHandler.java | 6 ++- .../pinot/core/accounting/TestThreadMXBean.java | 41 +++++++------- .../perf/BenchmarkThreadResourceUsageProvider.java | 9 ++-- .../spi/accounting/ThreadResourceSnapshot.java | 63 ++++++++++++++++++++++ .../accounting/ThreadResourceUsageAccountant.java | 4 ++ .../accounting/ThreadResourceUsageProvider.java | 42 ++++++--------- .../java/org/apache/pinot/spi/trace/Tracing.java | 24 ++++++--- 12 files changed, 177 insertions(+), 100 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java index c97e6a06e3..f709f79a72 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.RoaringBitmapUtils; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -415,7 +416,7 @@ public class DataTableImplV4 implements DataTable { @Override public byte[] toBytes() throws IOException { - ThreadResourceUsageProvider threadTimer = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); @@ -425,13 +426,12 @@ public class DataTableImplV4 implements DataTable { // TODO: The check on cpu time and memory measurement is not needed. We can remove it. But keeping it around for // backward compatibility. if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) { - long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs(); - getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs)); + getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), + String.valueOf(resourceSnapshot.getCpuTimeNs())); } if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) { - long responseSerializationAllocatedBytes = threadTimer.getThreadAllocatedBytes(); getMetadata().put(MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES.getName(), - String.valueOf(responseSerializationAllocatedBytes)); + String.valueOf(resourceSnapshot.getAllocatedBytes())); } // Write metadata: length followed by actual metadata bytes. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java index eb7ac33e2b..94cd00eef5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.utils.CommonConstants; @@ -43,6 +44,9 @@ public class CPUMemThreadLevelAccountingObjects { volatile long _currentThreadCPUTimeSampleMS = 0; volatile long _currentThreadMemoryAllocationSampleBytes = 0; + // reference point for start time/bytes + private final ThreadResourceSnapshot _threadResourceSnapshot = new ThreadResourceSnapshot(); + // previous query_id, task_id of the thread, this field should only be accessed by the accountant TaskEntry _previousThreadTaskStatus = null; // previous cpu time and memory allocation of the thread @@ -113,6 +117,20 @@ public class CPUMemThreadLevelAccountingObjects { public void setThreadTaskStatus(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, @Nonnull Thread anchorThread) { _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, anchorThread)); + _threadResourceSnapshot.reset(); + } + + /** + * Note that the precision does not match the name of the variable. + * _currentThreadCPUTimeSampleMS is in nanoseconds, but the variable name suggests milliseconds. + * This is to maintain backward compatibility. It replaces code that set the value in nanoseconds. + */ + public void updateCpuSnapshot() { + _currentThreadCPUTimeSampleMS = _threadResourceSnapshot.getCpuTimeNs(); + } + + public void updateMemorySnapshot() { + _currentThreadMemoryAllocationSampleBytes = _threadResourceSnapshot.getAllocatedBytes(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index 2430ce7895..df110358cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -114,9 +114,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } ); - // ThreadResourceUsageProvider(ThreadMXBean wrapper) per runner/worker thread - private final ThreadLocal<ThreadResourceUsageProvider> _threadResourceUsageProvider; - // track thread cpu time private final boolean _isThreadCPUSamplingEnabled; @@ -168,9 +165,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE); LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE); - // ThreadMXBean wrapper - _threadResourceUsageProvider = new ThreadLocal<>(); - // task/query tracking _inactiveQuery = new HashSet<>(); @@ -277,19 +271,26 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } @Override - public void updateQueryUsageConcurrently(String queryId) { + @Deprecated + public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) { + } + + @Override + public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long memoryAllocatedBytes) { if (_isThreadCPUSamplingEnabled) { - long cpuUsageNS = getThreadResourceUsageProvider().getThreadTimeNs(); _concurrentTaskCPUStatsAggregator.compute(queryId, - (key, value) -> (value == null) ? cpuUsageNS : (value + cpuUsageNS)); + (key, value) -> (value == null) ? cpuTimeNs : (value + cpuTimeNs)); } if (_isThreadMemorySamplingEnabled) { - long memoryAllocatedBytes = getThreadResourceUsageProvider().getThreadAllocatedBytes(); _concurrentTaskMemStatsAggregator.compute(queryId, (key, value) -> (value == null) ? memoryAllocatedBytes : (value + memoryAllocatedBytes)); } } + @Override + @Deprecated + public void updateQueryUsageConcurrently(String queryId) { + } /** * The thread would need to do {@code setThreadResourceUsageProvider} first upon it is scheduled. @@ -297,9 +298,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory */ @SuppressWarnings("ConstantConditions") public void sampleThreadCPUTime() { - ThreadResourceUsageProvider provider = getThreadResourceUsageProvider(); - if (_isThreadCPUSamplingEnabled && provider != null) { - _threadLocalEntry.get()._currentThreadCPUTimeSampleMS = provider.getThreadTimeNs(); + if (_isThreadCPUSamplingEnabled) { + _threadLocalEntry.get().updateCpuSnapshot(); } } @@ -309,21 +309,11 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory */ @SuppressWarnings("ConstantConditions") public void sampleThreadBytesAllocated() { - ThreadResourceUsageProvider provider = getThreadResourceUsageProvider(); - if (_isThreadMemorySamplingEnabled && provider != null) { - _threadLocalEntry.get()._currentThreadMemoryAllocationSampleBytes = provider.getThreadAllocatedBytes(); + if (_isThreadMemorySamplingEnabled) { + _threadLocalEntry.get().updateMemorySnapshot(); } } - private ThreadResourceUsageProvider getThreadResourceUsageProvider() { - return _threadResourceUsageProvider.get(); - } - - @Override - public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) { - _threadResourceUsageProvider.set(threadResourceUsageProvider); - } - @Override public void setupRunner(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType) { _threadLocalEntry.get()._errorStatus.set(null); @@ -362,8 +352,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = _threadLocalEntry.get(); // clear task info + stats threadEntry.setToIdle(); - // clear threadResourceUsageProvider - _threadResourceUsageProvider.remove(); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index 9e8fa43b41..6def833b06 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -30,7 +30,7 @@ import org.apache.pinot.core.operator.combine.BaseCombineOperator; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.SegmentContext; -import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryErrorMessage; @@ -102,13 +102,12 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock protected BaseResultsBlock getBaseBlock() { long startWallClockTimeNs = System.nanoTime(); - ThreadResourceUsageProvider mainThreadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot(); BaseResultsBlock resultsBlock = getCombinedResults(); - // No-ops if CPU time measurement and/or memory allocation measurements are not enabled. - long mainThreadCpuTimeNs = mainThreadResourceUsageProvider.getThreadTimeNs(); - long mainThreadMemAllocatedBytes = mainThreadResourceUsageProvider.getThreadAllocatedBytes(); + long mainThreadCpuTimeNs = resourceSnapshot.getCpuTimeNs(); + long mainThreadMemAllocatedBytes = resourceSnapshot.getAllocatedBytes(); long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 8a1a90dffe..c6a9070d6f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -34,7 +34,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.util.QueryMultiThreadingUtils; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.spi.accounting.ThreadExecutionContext; -import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryErrorMessage; @@ -103,8 +103,7 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba _futures[i] = _executorService.submit(new TraceRunnable() { @Override public void runJob() { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); - + ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot(); Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext); // Register the task to the phaser @@ -136,8 +135,8 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba Tracing.ThreadAccountantOps.clear(); } - _totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs()); - _totalWorkerThreadMemAllocatedBytes.getAndAdd(threadResourceUsageProvider.getThreadAllocatedBytes()); + _totalWorkerThreadCpuTimeNs.getAndAdd(resourceSnapshot.getCpuTimeNs()); + _totalWorkerThreadMemAllocatedBytes.getAndAdd(resourceSnapshot.getAllocatedBytes()); } }); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java index 3a117a4991..f5c9252d3a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java @@ -25,6 +25,7 @@ import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,16 +63,17 @@ public class DataTableHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { - Tracing.ThreadAccountantOps.setThreadResourceUsageProvider(); int responseSize = msg.readableBytes(); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_RECEIVED, responseSize); try { long deserializationStartTimeMs = System.currentTimeMillis(); + ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot(); DataTable dataTable = DataTableFactory.getDataTable(msg.nioBuffer()); _queryRouter.receiveDataTable(_serverRoutingInstance, dataTable, responseSize, (int) (System.currentTimeMillis() - deserializationStartTimeMs)); long requestID = Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName())); - Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID)); + Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID), + resourceSnapshot.getCpuTimeNs(), resourceSnapshot.getAllocatedBytes()); } catch (Exception e) { LOGGER.error("Caught exception while deserializing data table of size: {} from server: {}", responseSize, _serverRoutingInstance, e); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java index 6ca2a8c9e6..572c2afaa8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java @@ -26,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +50,11 @@ public class TestThreadMXBean { @Test public void testThreadMXBeanSimpleMemAllocTracking() { if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); long[] ll = new long[10000]; ll[2] = 4; LOGGER.trace(String.valueOf(ll[2])); - long result = threadResourceUsageProvider.getThreadAllocatedBytes(); + long result = threadResourceSnapshot.getAllocatedBytes(); Assert.assertTrue(result >= 80000 && result <= 85000); } } @@ -75,29 +76,29 @@ public class TestThreadMXBean { System.gc(); long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed(); - ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot0 = new ThreadResourceSnapshot(); executor.submit(() -> { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); for (int i = 0; i < 100000; i++) { concurrentHashMap.put(i, i); } - a.set(threadResourceUsageProvider.getThreadAllocatedBytes()); + a.set(threadResourceSnapshot.getAllocatedBytes()); }); executor.submit(() -> { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); for (int i = 100000; i < 200000; i++) { concurrentHashMap.put(i, i); } - b.set(threadResourceUsageProvider.getThreadAllocatedBytes()); + b.set(threadResourceSnapshot.getAllocatedBytes()); }); executor.submit(() -> { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); for (int i = 0; i < 200000; i++) { concurrentHashMap2.put(i, i); } - c.set(threadResourceUsageProvider.getThreadAllocatedBytes()); + c.set(threadResourceSnapshot.getAllocatedBytes()); }); try { @@ -105,7 +106,7 @@ public class TestThreadMXBean { } catch (InterruptedException ignored) { } - long d = threadResourceUsageProvider0.getThreadAllocatedBytes(); + long d = threadResourceSnapshot0.getAllocatedBytes(); long threadAllocatedBytes = a.get() + b.get() + c.get() + d; float heapUsedBytes = (float) memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev; float ratio = threadAllocatedBytes / heapUsedBytes; @@ -132,29 +133,29 @@ public class TestThreadMXBean { System.gc(); long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed(); - ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot0 = new ThreadResourceSnapshot(); executor.submit(() -> { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); for (int i = 0; i < 100; i++) { concurrentHashMap.put(i, new NestedArray()); } - a.set(threadResourceUsageProvider.getThreadAllocatedBytes()); + a.set(threadResourceSnapshot.getAllocatedBytes()); }); executor.submit(() -> { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); for (int i = 100; i < 200; i++) { concurrentHashMap.put(i, new NestedArray()); } - b.set(threadResourceUsageProvider.getThreadAllocatedBytes()); + b.set(threadResourceSnapshot.getAllocatedBytes()); }); executor.submit(() -> { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot = new ThreadResourceSnapshot(); for (int i = 0; i < 200; i++) { concurrentHashMap2.put(i, new NestedArray()); } - c.set(threadResourceUsageProvider.getThreadAllocatedBytes()); + c.set(threadResourceSnapshot.getAllocatedBytes()); }); try { @@ -162,7 +163,7 @@ public class TestThreadMXBean { } catch (InterruptedException ignored) { } - long d = threadResourceUsageProvider0.getThreadAllocatedBytes(); + long d = threadResourceSnapshot0.getAllocatedBytes(); long threadAllocatedBytes = a.get() + b.get() + c.get() + d; float heapUsedBytes = (float) memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev; float ratio = threadAllocatedBytes / heapUsedBytes; @@ -181,14 +182,14 @@ public class TestThreadMXBean { LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO); MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); System.gc(); - ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider(); + ThreadResourceSnapshot threadResourceSnapshot0 = new ThreadResourceSnapshot(); long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed(); for (int i = 0; i < 3; i++) { long[] ignored = new long[100000000]; } System.gc(); long heapResult = memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev; - long result = threadResourceUsageProvider0.getThreadAllocatedBytes(); + long result = threadResourceSnapshot0.getAllocatedBytes(); LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}", result, heapResult); } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java index ea82389e14..2c93f06771 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java @@ -19,6 +19,7 @@ package org.apache.pinot.perf; import java.util.concurrent.TimeUnit; +import org.apache.pinot.spi.accounting.ThreadResourceSnapshot; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -65,24 +66,24 @@ public class BenchmarkThreadResourceUsageProvider { @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void benchThreadMXBeanThreadCPUTime(MyState myState, Blackhole bh) { - bh.consume(myState._threadResourceUsageProvider.getThreadTimeNs()); + bh.consume(myState._threadResourceSnapshot.getCpuTimeNs()); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void benchThreadMXBeanMem(MyState myState, Blackhole bh) { - bh.consume(myState._threadResourceUsageProvider.getThreadAllocatedBytes()); + bh.consume(myState._threadResourceSnapshot.getAllocatedBytes()); } @State(Scope.Benchmark) public static class MyState { - ThreadResourceUsageProvider _threadResourceUsageProvider; + ThreadResourceSnapshot _threadResourceSnapshot; long[] _allocation; @Setup(Level.Iteration) public void doSetup() { - _threadResourceUsageProvider = new ThreadResourceUsageProvider(); + _threadResourceSnapshot = new ThreadResourceSnapshot(); } @Setup(Level.Invocation) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java new file mode 100644 index 0000000000..befad65a21 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.accounting; + +/** + * ThreadResourceSnapshot is a utility class that helps to track the CPU time and memory allocated. + * {@link ThreadResourceUsageProvider} provides cumulative CPU time and memory allocated for the current thread. + * This class uses that provider to snapshot start for a task executed by that thread. + */ +public class ThreadResourceSnapshot { + private long _startCpuTime; + private long _startAllocatedBytes; + + /** + * Creates a new tracker and takes initial snapshots. + */ + public ThreadResourceSnapshot() { + reset(); + } + + public void reset() { + _startCpuTime = ThreadResourceUsageProvider.getCurrentThreadCpuTime(); + _startAllocatedBytes = ThreadResourceUsageProvider.getCurrentThreadAllocatedBytes(); + } + + /** + * Gets the CPU time used so far in nanoseconds. + * This is the difference between the current CPU time and the start CPU time. + */ + public long getCpuTimeNs() { + return ThreadResourceUsageProvider.getCurrentThreadCpuTime() - _startCpuTime; + } + + /** + * Gets the memory allocated so far in bytes. + * This is the difference between the current allocated bytes and the start allocated bytes. + */ + public long getAllocatedBytes() { + return ThreadResourceUsageProvider.getCurrentThreadAllocatedBytes() - _startAllocatedBytes; + } + + @Override + public String toString() { + return "ThreadResourceSnapshot{" + "cpuTime=" + (getCpuTimeNs()) + ", allocatedBytes=" + + (getAllocatedBytes()) + '}'; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index 6a33ac3ac0..dd9a8e9c3f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -70,6 +70,7 @@ public interface ThreadResourceUsageAccountant { /** * set resource usage provider */ + @Deprecated void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider); /** @@ -87,6 +88,9 @@ public interface ThreadResourceUsageAccountant { * ser/de threads where the thread execution context cannot be setup before hands as * queryId/taskId is unknown and the execution process is hard to instrument */ + void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long allocatedBytes); + + @Deprecated void updateQueryUsageConcurrently(String queryId); /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java index a103fe73c5..f5ebc5ed2f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java @@ -51,22 +51,28 @@ public class ThreadResourceUsageProvider { private static boolean _isThreadCpuTimeMeasurementEnabled = false; private static boolean _isThreadMemoryMeasurementEnabled = false; - // reference point for start time/bytes - private final long _startTimeNs; - private final long _startBytesAllocated; + @Deprecated + public long getThreadTimeNs() { + return 0; + } + + @Deprecated + public long getThreadAllocatedBytes() { + return 0; + } - public ThreadResourceUsageProvider() { - _startTimeNs = _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() : -1; + public static long getCurrentThreadCpuTime() { + return _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() : 0; + } - long startBytesAllocated1; + public static long getCurrentThreadAllocatedBytes() { try { - startBytesAllocated1 = _isThreadMemoryMeasurementEnabled - ? (long) SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD.invoke(MX_BEAN, Thread.currentThread().getId()) : -1; + return _isThreadMemoryMeasurementEnabled ? (long) SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD + .invoke(MX_BEAN, Thread.currentThread().getId()) : 0; } catch (IllegalAccessException | InvocationTargetException e) { - startBytesAllocated1 = -1; - LOGGER.error("Exception happened during the invocation of getting initial bytes allocated", e); + LOGGER.error("Exception happened during the invocation of getting current bytes allocated", e); + return 0; } - _startBytesAllocated = startBytesAllocated1; } public static boolean isThreadCpuTimeMeasurementEnabled() { @@ -100,20 +106,6 @@ public class ThreadResourceUsageProvider { _isThreadMemoryMeasurementEnabled = enable && IS_THREAD_ALLOCATED_MEMORY_SUPPORTED && isThreadAllocateMemoryEnabled; } - public long getThreadTimeNs() { - return _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() - _startTimeNs : 0; - } - - public long getThreadAllocatedBytes() { - try { - return _isThreadMemoryMeasurementEnabled ? (long) SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD - .invoke(MX_BEAN, Thread.currentThread().getId()) - _startBytesAllocated : 0; - } catch (IllegalAccessException | InvocationTargetException e) { - LOGGER.error("Exception happened during the invocation of getting initial bytes allocated", e); - return 0; - } - } - //initialize the com.sun.management.ThreadMXBean related variables using reflection static { Class<?> sunThreadMXBeanClass; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 198d083c57..551cf2968f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -191,19 +191,27 @@ public class Tracing { } @Override - public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) { + public void sampleUsage() { } @Override - public void sampleUsage() { + public void sampleUsageMSE() { } @Override - public void sampleUsageMSE() { + @Deprecated + public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) { } @Override + @Deprecated public void updateQueryUsageConcurrently(String queryId) { + // No-op for default accountant + } + + @Override + public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long allocatedBytes) { + // No-op for default accountant } @Override @@ -256,7 +264,6 @@ public class Tracing { } public static void setupRunner(String queryId, ThreadExecutionContext.TaskType taskType) { - Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider()); Tracing.getThreadAccountant().setupRunner(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType); } @@ -276,7 +283,6 @@ public class Tracing { */ public static void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext threadExecutionContext) { - Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider()); Tracing.getThreadAccountant().setupWorker(taskId, taskType, threadExecutionContext); } @@ -326,12 +332,16 @@ public class Tracing { sample(); } + @Deprecated public static void updateQueryUsageConcurrently(String queryId) { - Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId); } + public static void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long allocatedBytes) { + Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId, cpuTimeNs, allocatedBytes); + } + + @Deprecated public static void setThreadResourceUsageProvider() { - Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider()); } // Check for thread interruption, every time after merging 8192 keys --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org