This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b5afb89074 Make ThreadResourceUsageProvider a Helper/Utility Class. 
(#16051)
b5afb89074 is described below

commit b5afb890741909c84e327c1d5a088c685502373d
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Fri Jun 20 11:04:29 2025 +0530

    Make ThreadResourceUsageProvider a Helper/Utility Class. (#16051)
    
    * ThreadResourceUsageProvider is a helper class. ThreadResourceContext 
tracks resource usage.
    
    Fix updateConcurrently
    
    * Rename to ThreadResourceSnapshot
    
    * Clean up
    
    * Add javadoc
    
    * Done use auto closeable
    
    * Checkstyle
    
    * Fix compilation error
    
    * Add back removed functions in SPI
    
    * Remove private constructor because japicmp complains.
    
    * Add setThreadResourceUsageProvider because of backward-incompatible checks
    
    * Add setThreadResourceUsageProvider because of backward-incompatible checks
    
    * Fix test
    
    * Fix ThreadResourceSnapshot usage and tests
    
    * Store cpu sample in nanoseconds.
---
 .../pinot/common/datatable/DataTableImplV4.java    | 10 ++--
 .../CPUMemThreadLevelAccountingObjects.java        | 18 +++++++
 .../PerQueryCPUMemAccountantFactory.java           | 42 ++++++---------
 .../core/operator/InstanceResponseOperator.java    |  9 ++--
 .../core/operator/combine/BaseCombineOperator.java |  9 ++--
 .../pinot/core/transport/DataTableHandler.java     |  6 ++-
 .../pinot/core/accounting/TestThreadMXBean.java    | 41 +++++++-------
 .../perf/BenchmarkThreadResourceUsageProvider.java |  9 ++--
 .../spi/accounting/ThreadResourceSnapshot.java     | 63 ++++++++++++++++++++++
 .../accounting/ThreadResourceUsageAccountant.java  |  4 ++
 .../accounting/ThreadResourceUsageProvider.java    | 42 ++++++---------
 .../java/org/apache/pinot/spi/trace/Tracing.java   | 24 ++++++---
 12 files changed, 177 insertions(+), 100 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index c97e6a06e3..f709f79a72 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -415,7 +416,7 @@ public class DataTableImplV4 implements DataTable {
   @Override
   public byte[] toBytes()
       throws IOException {
-    ThreadResourceUsageProvider threadTimer = new 
ThreadResourceUsageProvider();
+    ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
 
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
@@ -425,13 +426,12 @@ public class DataTableImplV4 implements DataTable {
     // TODO: The check on cpu time and memory measurement is not needed. We 
can remove it. But keeping it around for
     // backward compatibility.
     if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
-      long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
-      getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), 
String.valueOf(responseSerializationCpuTimeNs));
+      getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
+          String.valueOf(resourceSnapshot.getCpuTimeNs()));
     }
     if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
-      long responseSerializationAllocatedBytes = 
threadTimer.getThreadAllocatedBytes();
       getMetadata().put(MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES.getName(),
-          String.valueOf(responseSerializationAllocatedBytes));
+          String.valueOf(resourceSnapshot.getAllocatedBytes()));
     }
 
     // Write metadata: length followed by actual metadata bytes.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index eb7ac33e2b..94cd00eef5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.utils.CommonConstants;
 
@@ -43,6 +44,9 @@ public class CPUMemThreadLevelAccountingObjects {
     volatile long _currentThreadCPUTimeSampleMS = 0;
     volatile long _currentThreadMemoryAllocationSampleBytes = 0;
 
+    // reference point for start time/bytes
+    private final ThreadResourceSnapshot _threadResourceSnapshot = new 
ThreadResourceSnapshot();
+
     // previous query_id, task_id of the thread, this field should only be 
accessed by the accountant
     TaskEntry _previousThreadTaskStatus = null;
     // previous cpu time and memory allocation of the thread
@@ -113,6 +117,20 @@ public class CPUMemThreadLevelAccountingObjects {
     public void setThreadTaskStatus(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
         @Nonnull Thread anchorThread) {
       _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, 
anchorThread));
+      _threadResourceSnapshot.reset();
+    }
+
+    /**
+     * Note that the precision does not match the name of the variable.
+     * _currentThreadCPUTimeSampleMS is in nanoseconds, but the variable name 
suggests milliseconds.
+     * This is to maintain backward compatibility. It replaces code that set 
the value in nanoseconds.
+     */
+    public void updateCpuSnapshot() {
+      _currentThreadCPUTimeSampleMS = _threadResourceSnapshot.getCpuTimeNs();
+    }
+
+    public void updateMemorySnapshot() {
+      _currentThreadMemoryAllocationSampleBytes = 
_threadResourceSnapshot.getAllocatedBytes();
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 2430ce7895..df110358cb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -114,9 +114,6 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         }
     );
 
-    // ThreadResourceUsageProvider(ThreadMXBean wrapper) per runner/worker 
thread
-    private final ThreadLocal<ThreadResourceUsageProvider> 
_threadResourceUsageProvider;
-
     // track thread cpu time
     private final boolean _isThreadCPUSamplingEnabled;
 
@@ -168,9 +165,6 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
               CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
       LOGGER.info("_isThreadSamplingEnabledForMSE: {}", 
_isThreadSamplingEnabledForMSE);
 
-      // ThreadMXBean wrapper
-      _threadResourceUsageProvider = new ThreadLocal<>();
-
       // task/query tracking
       _inactiveQuery = new HashSet<>();
 
@@ -277,19 +271,26 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     }
 
     @Override
-    public void updateQueryUsageConcurrently(String queryId) {
+    @Deprecated
+    public void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider) {
+    }
+
+    @Override
+    public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, 
long memoryAllocatedBytes) {
       if (_isThreadCPUSamplingEnabled) {
-        long cpuUsageNS = getThreadResourceUsageProvider().getThreadTimeNs();
         _concurrentTaskCPUStatsAggregator.compute(queryId,
-            (key, value) -> (value == null) ? cpuUsageNS : (value + 
cpuUsageNS));
+            (key, value) -> (value == null) ? cpuTimeNs : (value + cpuTimeNs));
       }
       if (_isThreadMemorySamplingEnabled) {
-        long memoryAllocatedBytes = 
getThreadResourceUsageProvider().getThreadAllocatedBytes();
         _concurrentTaskMemStatsAggregator.compute(queryId,
             (key, value) -> (value == null) ? memoryAllocatedBytes : (value + 
memoryAllocatedBytes));
       }
     }
 
+    @Override
+    @Deprecated
+    public void updateQueryUsageConcurrently(String queryId) {
+    }
 
     /**
      * The thread would need to do {@code setThreadResourceUsageProvider} 
first upon it is scheduled.
@@ -297,9 +298,8 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
      */
     @SuppressWarnings("ConstantConditions")
     public void sampleThreadCPUTime() {
-      ThreadResourceUsageProvider provider = getThreadResourceUsageProvider();
-      if (_isThreadCPUSamplingEnabled && provider != null) {
-        _threadLocalEntry.get()._currentThreadCPUTimeSampleMS = 
provider.getThreadTimeNs();
+      if (_isThreadCPUSamplingEnabled) {
+        _threadLocalEntry.get().updateCpuSnapshot();
       }
     }
 
@@ -309,21 +309,11 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
      */
     @SuppressWarnings("ConstantConditions")
     public void sampleThreadBytesAllocated() {
-      ThreadResourceUsageProvider provider = getThreadResourceUsageProvider();
-      if (_isThreadMemorySamplingEnabled && provider != null) {
-        _threadLocalEntry.get()._currentThreadMemoryAllocationSampleBytes = 
provider.getThreadAllocatedBytes();
+      if (_isThreadMemorySamplingEnabled) {
+        _threadLocalEntry.get().updateMemorySnapshot();
       }
     }
 
-    private ThreadResourceUsageProvider getThreadResourceUsageProvider() {
-      return _threadResourceUsageProvider.get();
-    }
-
-    @Override
-    public void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider) {
-      _threadResourceUsageProvider.set(threadResourceUsageProvider);
-    }
-
     @Override
     public void setupRunner(@Nullable String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType) {
       _threadLocalEntry.get()._errorStatus.set(null);
@@ -362,8 +352,6 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
_threadLocalEntry.get();
       // clear task info + stats
       threadEntry.setToIdle();
-      // clear threadResourceUsageProvider
-      _threadResourceUsageProvider.remove();
     }
 
     @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 9e8fa43b41..6def833b06 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -30,7 +30,7 @@ import 
org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.SegmentContext;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.QueryErrorMessage;
@@ -102,13 +102,12 @@ public class InstanceResponseOperator extends 
BaseOperator<InstanceResponseBlock
 
   protected BaseResultsBlock getBaseBlock() {
     long startWallClockTimeNs = System.nanoTime();
-    ThreadResourceUsageProvider mainThreadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+    ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
 
     BaseResultsBlock resultsBlock = getCombinedResults();
 
-    // No-ops if CPU time measurement and/or memory allocation measurements 
are not enabled.
-    long mainThreadCpuTimeNs = 
mainThreadResourceUsageProvider.getThreadTimeNs();
-    long mainThreadMemAllocatedBytes = 
mainThreadResourceUsageProvider.getThreadAllocatedBytes();
+    long mainThreadCpuTimeNs = resourceSnapshot.getCpuTimeNs();
+    long mainThreadMemAllocatedBytes = resourceSnapshot.getAllocatedBytes();
 
     long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 8a1a90dffe..c6a9070d6f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -34,7 +34,7 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.util.QueryMultiThreadingUtils;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.QueryErrorMessage;
@@ -103,8 +103,7 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
       _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
+          ThreadResourceSnapshot resourceSnapshot = new 
ThreadResourceSnapshot();
           Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
 
           // Register the task to the phaser
@@ -136,8 +135,8 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
             Tracing.ThreadAccountantOps.clear();
           }
 
-          
_totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs());
-          
_totalWorkerThreadMemAllocatedBytes.getAndAdd(threadResourceUsageProvider.getThreadAllocatedBytes());
+          
_totalWorkerThreadCpuTimeNs.getAndAdd(resourceSnapshot.getCpuTimeNs());
+          
_totalWorkerThreadMemAllocatedBytes.getAndAdd(resourceSnapshot.getAllocatedBytes());
         }
       });
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
index 3a117a4991..f5c9252d3a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,16 +63,17 @@ public class DataTableHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    Tracing.ThreadAccountantOps.setThreadResourceUsageProvider();
     int responseSize = msg.readableBytes();
     
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
 responseSize);
     try {
       long deserializationStartTimeMs = System.currentTimeMillis();
+      ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
       DataTable dataTable = DataTableFactory.getDataTable(msg.nioBuffer());
       _queryRouter.receiveDataTable(_serverRoutingInstance, dataTable, 
responseSize,
           (int) (System.currentTimeMillis() - deserializationStartTimeMs));
       long requestID = 
Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName()));
-      
Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID));
+      
Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID),
+          resourceSnapshot.getCpuTimeNs(), 
resourceSnapshot.getAllocatedBytes());
     } catch (Exception e) {
       LOGGER.error("Caught exception while deserializing data table of size: 
{} from server: {}", responseSize,
           _serverRoutingInstance, e);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
index 6ca2a8c9e6..572c2afaa8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +50,11 @@ public class TestThreadMXBean {
   @Test
   public void testThreadMXBeanSimpleMemAllocTracking() {
     if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
-      ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+      ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
       long[] ll = new long[10000];
       ll[2] = 4;
       LOGGER.trace(String.valueOf(ll[2]));
-      long result = threadResourceUsageProvider.getThreadAllocatedBytes();
+      long result = threadResourceSnapshot.getAllocatedBytes();
       Assert.assertTrue(result >= 80000 && result <= 85000);
     }
   }
@@ -75,29 +76,29 @@ public class TestThreadMXBean {
       System.gc();
 
       long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
-      ThreadResourceUsageProvider threadResourceUsageProvider0 = new 
ThreadResourceUsageProvider();
+      ThreadResourceSnapshot threadResourceSnapshot0 = new 
ThreadResourceSnapshot();
       executor.submit(() -> {
-        ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+        ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
         for (int i = 0; i < 100000; i++) {
           concurrentHashMap.put(i, i);
         }
-        a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+        a.set(threadResourceSnapshot.getAllocatedBytes());
       });
 
       executor.submit(() -> {
-        ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+        ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
         for (int i = 100000; i < 200000; i++) {
           concurrentHashMap.put(i, i);
         }
-        b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+        b.set(threadResourceSnapshot.getAllocatedBytes());
       });
 
       executor.submit(() -> {
-        ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+        ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
         for (int i = 0; i < 200000; i++) {
           concurrentHashMap2.put(i, i);
         }
-        c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+        c.set(threadResourceSnapshot.getAllocatedBytes());
       });
 
       try {
@@ -105,7 +106,7 @@ public class TestThreadMXBean {
       } catch (InterruptedException ignored) {
       }
 
-      long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
+      long d = threadResourceSnapshot0.getAllocatedBytes();
       long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
       float heapUsedBytes = (float) 
memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
       float ratio = threadAllocatedBytes / heapUsedBytes;
@@ -132,29 +133,29 @@ public class TestThreadMXBean {
       System.gc();
 
       long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
-      ThreadResourceUsageProvider threadResourceUsageProvider0 = new 
ThreadResourceUsageProvider();
+      ThreadResourceSnapshot threadResourceSnapshot0 = new 
ThreadResourceSnapshot();
       executor.submit(() -> {
-        ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+        ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
         for (int i = 0; i < 100; i++) {
           concurrentHashMap.put(i, new NestedArray());
         }
-        a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+        a.set(threadResourceSnapshot.getAllocatedBytes());
       });
 
       executor.submit(() -> {
-        ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+        ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
         for (int i = 100; i < 200; i++) {
           concurrentHashMap.put(i, new NestedArray());
         }
-        b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+        b.set(threadResourceSnapshot.getAllocatedBytes());
       });
 
       executor.submit(() -> {
-        ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+        ThreadResourceSnapshot threadResourceSnapshot = new 
ThreadResourceSnapshot();
         for (int i = 0; i < 200; i++) {
           concurrentHashMap2.put(i, new NestedArray());
         }
-        c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+        c.set(threadResourceSnapshot.getAllocatedBytes());
       });
 
       try {
@@ -162,7 +163,7 @@ public class TestThreadMXBean {
       } catch (InterruptedException ignored) {
       }
 
-      long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
+      long d = threadResourceSnapshot0.getAllocatedBytes();
       long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
       float heapUsedBytes = (float) 
memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
       float ratio = threadAllocatedBytes / heapUsedBytes;
@@ -181,14 +182,14 @@ public class TestThreadMXBean {
     LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
     MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
     System.gc();
-    ThreadResourceUsageProvider threadResourceUsageProvider0 = new 
ThreadResourceUsageProvider();
+    ThreadResourceSnapshot threadResourceSnapshot0 = new 
ThreadResourceSnapshot();
     long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
     for (int i = 0; i < 3; i++) {
       long[] ignored = new long[100000000];
     }
     System.gc();
     long heapResult = memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
-    long result = threadResourceUsageProvider0.getThreadAllocatedBytes();
+    long result = threadResourceSnapshot0.getAllocatedBytes();
     LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}",
         result, heapResult);
   }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
index ea82389e14..2c93f06771 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.perf;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -65,24 +66,24 @@ public class BenchmarkThreadResourceUsageProvider {
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public void benchThreadMXBeanThreadCPUTime(MyState myState, Blackhole bh) {
-    bh.consume(myState._threadResourceUsageProvider.getThreadTimeNs());
+    bh.consume(myState._threadResourceSnapshot.getCpuTimeNs());
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public void benchThreadMXBeanMem(MyState myState, Blackhole bh) {
-    bh.consume(myState._threadResourceUsageProvider.getThreadAllocatedBytes());
+    bh.consume(myState._threadResourceSnapshot.getAllocatedBytes());
   }
 
   @State(Scope.Benchmark)
   public static class MyState {
-    ThreadResourceUsageProvider _threadResourceUsageProvider;
+    ThreadResourceSnapshot _threadResourceSnapshot;
     long[] _allocation;
 
     @Setup(Level.Iteration)
     public void doSetup() {
-      _threadResourceUsageProvider = new ThreadResourceUsageProvider();
+      _threadResourceSnapshot = new ThreadResourceSnapshot();
     }
 
     @Setup(Level.Invocation)
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java
new file mode 100644
index 0000000000..befad65a21
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+/**
+ * ThreadResourceSnapshot is a utility class that helps to track the CPU time 
and memory allocated.
+ * {@link ThreadResourceUsageProvider} provides cumulative CPU time and memory 
allocated for the current thread.
+ * This class uses that provider to snapshot start for a task executed by that 
thread.
+ */
+public class ThreadResourceSnapshot {
+  private long _startCpuTime;
+  private long _startAllocatedBytes;
+
+  /**
+   * Creates a new tracker and takes initial snapshots.
+   */
+  public ThreadResourceSnapshot() {
+    reset();
+  }
+
+  public void reset() {
+    _startCpuTime = ThreadResourceUsageProvider.getCurrentThreadCpuTime();
+    _startAllocatedBytes = 
ThreadResourceUsageProvider.getCurrentThreadAllocatedBytes();
+  }
+
+  /**
+   * Gets the CPU time used so far in nanoseconds.
+   * This is the difference between the current CPU time and the start CPU 
time.
+   */
+  public long getCpuTimeNs() {
+    return ThreadResourceUsageProvider.getCurrentThreadCpuTime() - 
_startCpuTime;
+  }
+
+  /**
+   * Gets the memory allocated so far in bytes.
+   * This is the difference between the current allocated bytes and the start 
allocated bytes.
+   */
+  public long getAllocatedBytes() {
+    return ThreadResourceUsageProvider.getCurrentThreadAllocatedBytes() - 
_startAllocatedBytes;
+  }
+
+  @Override
+  public String toString() {
+    return "ThreadResourceSnapshot{" + "cpuTime=" + (getCpuTimeNs()) + ", 
allocatedBytes="
+        + (getAllocatedBytes()) + '}';
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 6a33ac3ac0..dd9a8e9c3f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -70,6 +70,7 @@ public interface ThreadResourceUsageAccountant {
   /**
    * set resource usage provider
    */
+  @Deprecated
   void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider);
 
   /**
@@ -87,6 +88,9 @@ public interface ThreadResourceUsageAccountant {
    * ser/de threads where the thread execution context cannot be setup before 
hands as
    * queryId/taskId is unknown and the execution process is hard to instrument
    */
+  void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long 
allocatedBytes);
+
+  @Deprecated
   void updateQueryUsageConcurrently(String queryId);
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
index a103fe73c5..f5ebc5ed2f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
@@ -51,22 +51,28 @@ public class ThreadResourceUsageProvider {
   private static boolean _isThreadCpuTimeMeasurementEnabled = false;
   private static boolean _isThreadMemoryMeasurementEnabled = false;
 
-  // reference point for start time/bytes
-  private final long _startTimeNs;
-  private final long _startBytesAllocated;
+  @Deprecated
+  public long getThreadTimeNs() {
+    return 0;
+  }
+
+  @Deprecated
+  public long getThreadAllocatedBytes() {
+    return 0;
+  }
 
-  public ThreadResourceUsageProvider() {
-    _startTimeNs = _isThreadCpuTimeMeasurementEnabled ? 
MX_BEAN.getCurrentThreadCpuTime() : -1;
+  public static long getCurrentThreadCpuTime() {
+    return _isThreadCpuTimeMeasurementEnabled ? 
MX_BEAN.getCurrentThreadCpuTime() : 0;
+  }
 
-    long startBytesAllocated1;
+  public static long getCurrentThreadAllocatedBytes() {
     try {
-      startBytesAllocated1 = _isThreadMemoryMeasurementEnabled
-          ? (long) 
SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD.invoke(MX_BEAN, 
Thread.currentThread().getId()) : -1;
+      return _isThreadMemoryMeasurementEnabled ? (long) 
SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD
+          .invoke(MX_BEAN, Thread.currentThread().getId()) : 0;
     } catch (IllegalAccessException | InvocationTargetException e) {
-      startBytesAllocated1 = -1;
-      LOGGER.error("Exception happened during the invocation of getting 
initial bytes allocated", e);
+      LOGGER.error("Exception happened during the invocation of getting 
current bytes allocated", e);
+      return 0;
     }
-    _startBytesAllocated = startBytesAllocated1;
   }
 
   public static boolean isThreadCpuTimeMeasurementEnabled() {
@@ -100,20 +106,6 @@ public class ThreadResourceUsageProvider {
     _isThreadMemoryMeasurementEnabled = enable && 
IS_THREAD_ALLOCATED_MEMORY_SUPPORTED && isThreadAllocateMemoryEnabled;
   }
 
-  public long getThreadTimeNs() {
-    return _isThreadCpuTimeMeasurementEnabled ? 
MX_BEAN.getCurrentThreadCpuTime() - _startTimeNs : 0;
-  }
-
-  public long getThreadAllocatedBytes() {
-    try {
-      return _isThreadMemoryMeasurementEnabled ? (long) 
SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD
-          .invoke(MX_BEAN, Thread.currentThread().getId()) - 
_startBytesAllocated : 0;
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOGGER.error("Exception happened during the invocation of getting 
initial bytes allocated", e);
-      return 0;
-    }
-  }
-
   //initialize the com.sun.management.ThreadMXBean related variables using 
reflection
   static {
     Class<?> sunThreadMXBeanClass;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 198d083c57..551cf2968f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -191,19 +191,27 @@ public class Tracing {
     }
 
     @Override
-    public void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider) {
+    public void sampleUsage() {
     }
 
     @Override
-    public void sampleUsage() {
+    public void sampleUsageMSE() {
     }
 
     @Override
-    public void sampleUsageMSE() {
+    @Deprecated
+    public void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider) {
     }
 
     @Override
+    @Deprecated
     public void updateQueryUsageConcurrently(String queryId) {
+      // No-op for default accountant
+    }
+
+    @Override
+    public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, 
long allocatedBytes) {
+      // No-op for default accountant
     }
 
     @Override
@@ -256,7 +264,6 @@ public class Tracing {
     }
 
     public static void setupRunner(String queryId, 
ThreadExecutionContext.TaskType taskType) {
-      Tracing.getThreadAccountant().setThreadResourceUsageProvider(new 
ThreadResourceUsageProvider());
       Tracing.getThreadAccountant().setupRunner(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType);
     }
 
@@ -276,7 +283,6 @@ public class Tracing {
      */
     public static void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
         @Nullable ThreadExecutionContext threadExecutionContext) {
-      Tracing.getThreadAccountant().setThreadResourceUsageProvider(new 
ThreadResourceUsageProvider());
       Tracing.getThreadAccountant().setupWorker(taskId, taskType, 
threadExecutionContext);
     }
 
@@ -326,12 +332,16 @@ public class Tracing {
       sample();
     }
 
+    @Deprecated
     public static void updateQueryUsageConcurrently(String queryId) {
-      Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId);
     }
 
+    public static void updateQueryUsageConcurrently(String queryId, long 
cpuTimeNs, long allocatedBytes) {
+      Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId, 
cpuTimeNs, allocatedBytes);
+    }
+
+    @Deprecated
     public static void setThreadResourceUsageProvider() {
-      Tracing.getThreadAccountant().setThreadResourceUsageProvider(new 
ThreadResourceUsageProvider());
     }
 
     // Check for thread interruption, every time after merging 8192 keys


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to