This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch tracing-spi
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/tracing-spi by this push:
     new d53bff0574 make active recording available implicitly
d53bff0574 is described below

commit d53bff05740d4c85fa6042d533275dd11d011d1d
Author: richardstartin <rich...@startree.ai>
AuthorDate: Tue Apr 5 16:43:48 2022 +0100

    make active recording available implicitly
---
 ...tsTest.java => OperatorExecutionStatsTest.java} |  2 +-
 .../apache/pinot/core/operator/BaseOperator.java   |  7 +----
 .../query/scheduler/resources/ResourceManager.java |  9 ++----
 .../pinot/core/util/trace/DefaultTracer.java       | 36 +++++++++++++++++++---
 .../apache/pinot/core/util/trace/TracedThread.java | 31 +++++++++++++++++--
 .../pinot/core/util/trace/TracedThreadFactory.java | 27 ++++++++++++++--
 .../pinot/spi/trace/{Scope.java => Execution.java} |  2 +-
 .../apache/pinot/spi/trace/OperatorExecution.java  |  2 +-
 .../spi/trace/{Scope.java => TraceContext.java}    | 12 ++++++--
 .../java/org/apache/pinot/spi/trace/Tracer.java    |  5 +++
 10 files changed, 105 insertions(+), 28 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java
similarity index 99%
rename from 
pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
rename to 
pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java
index 476bea869e..41a4d3f1f3 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java
@@ -29,7 +29,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
-public class ExecutionStatsTest {
+public class OperatorExecutionStatsTest {
 
   private JsonNode _mockBrokerResponse;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
index 369f936e95..bfebb0a95d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
-import org.apache.pinot.spi.trace.ExecutionRecording;
 import org.apache.pinot.spi.trace.OperatorExecution;
 import org.apache.pinot.spi.trace.Tracing;
 
@@ -37,14 +36,10 @@ public abstract class BaseOperator<T extends Block> 
implements Operator<T> {
       throw new EarlyTerminationException();
     }
     try (OperatorExecution execution = 
Tracing.getTracer().startOperatorExecution(getClass())) {
-      return getNextBlock(execution);
+      return getNextBlock();
     }
   }
 
   // Make it protected because we should always call nextBlock()
   protected abstract T getNextBlock();
-
-  protected T getNextBlock(ExecutionRecording recording) {
-    return getNextBlock();
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index d1de9e8c88..c73db98537 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -21,12 +21,12 @@ package org.apache.pinot.core.query.scheduler.resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.core.util.trace.TracedThreadFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,15 +80,12 @@ public abstract class ResourceManager {
     LOGGER.info("Initializing with {} query runner threads and {} worker 
threads", _numQueryRunnerThreads,
         _numQueryWorkerThreads);
     // pqr -> pinot query runner (to give short names)
-    ThreadFactory queryRunnerFactory =
-        new 
ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY).setNameFormat("pqr-%d")
-            .build();
+    ThreadFactory queryRunnerFactory = new 
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false, "pqr-%d");
     _queryRunners =
         
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads,
 queryRunnerFactory));
 
     // pqw -> pinot query workers
-    ThreadFactory queryWorkersFactory =
-        new 
ThreadFactoryBuilder().setDaemon(false).setPriority(Thread.NORM_PRIORITY).setNameFormat("pqw-%d").build();
+    ThreadFactory queryWorkersFactory = new 
TracedThreadFactory(Thread.NORM_PRIORITY, false, "pqw-%d");
     _queryWorkers =
         
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads,
 queryWorkersFactory));
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
index 91522f2591..ed53b49580 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pinot.core.util.trace;
 
+import java.util.ArrayDeque;
+import java.util.Deque;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.trace.ExecutionRecording;
 import org.apache.pinot.spi.trace.FilterType;
 import org.apache.pinot.spi.trace.OperatorExecution;
 import org.apache.pinot.spi.trace.Phase;
+import org.apache.pinot.spi.trace.TraceContext;
 import org.apache.pinot.spi.trace.Tracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +34,7 @@ import org.slf4j.LoggerFactory;
 public class DefaultTracer implements Tracer {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultTracer.class);
+  private static final ThreadLocal<Deque<ExecutionRecording>> STACK = 
ThreadLocal.withInitial(ArrayDeque::new);
 
   private static class NoOpExecution implements OperatorExecution {
 
@@ -72,9 +77,11 @@ public class DefaultTracer implements Tracer {
 
     private final long _startTimeMillis = System.currentTimeMillis();
     private final Class<?> _operator;
+    private final Runnable _onClose;
 
-    public MillisExecution(Class<?> operator) {
+    public MillisExecution(Class<?> operator, Runnable onClose) {
       _operator = operator;
+      _onClose = onClose;
     }
 
     @Override
@@ -84,17 +91,38 @@ public class DefaultTracer implements Tracer {
       if (LOGGER.isTraceEnabled()) {
         LOGGER.trace("Time spent in {}: {}", operatorName, duration);
       }
-      TraceContext.logTime(operatorName, duration);
+      org.apache.pinot.core.util.trace.TraceContext.logTime(operatorName, 
duration);
+      _onClose.run();
     }
   }
 
   @Override
   public void register(long requestId) {
-    TraceContext.register(requestId);
+    org.apache.pinot.core.util.trace.TraceContext.register(requestId);
   }
 
   @Override
   public OperatorExecution startOperatorExecution(Class<?> operatorClass) {
-    return TraceContext.traceEnabled() ? new MillisExecution(operatorClass) : 
NO_OP_SPAN;
+    if (org.apache.pinot.core.util.trace.TraceContext.traceEnabled()) {
+      Deque<ExecutionRecording> stack = getStack();
+      MillisExecution execution = new MillisExecution(operatorClass, 
stack::removeLast);
+      stack.addLast(execution);
+      return execution;
+    }
+    return NO_OP_SPAN;
+  }
+
+  @Override
+  public ExecutionRecording activeRecording() {
+    Deque<ExecutionRecording> stack = getStack();
+    return stack.isEmpty() ? NO_OP_SPAN : stack.peekLast();
+  }
+
+  private Deque<ExecutionRecording> getStack() {
+    Thread thread = Thread.currentThread();
+    if (thread instanceof TraceContext) {
+      return ((TraceContext) thread).getRecordings();
+    }
+    return STACK.get();
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java
similarity index 55%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java
index 4ff56d0b6c..f3f78a5382 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java
@@ -16,10 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.trace;
+package org.apache.pinot.core.util.trace;
 
-public interface Scope extends AutoCloseable {
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.pinot.spi.trace.ExecutionRecording;
+import org.apache.pinot.spi.trace.TraceContext;
+
+
+final class TracedThread extends Thread implements TraceContext {
+
+  private long _traceId = Long.MIN_VALUE;
+  private final Deque<ExecutionRecording> _stack = new ArrayDeque<>();
+
+  public TracedThread(Runnable target) {
+    super(target);
+  }
+
+  @Override
+  public void setTraceId(long traceId) {
+    _traceId = traceId;
+  }
+
+  @Override
+  public Deque<ExecutionRecording> getRecordings() {
+    return _stack;
+  }
 
   @Override
-  void close();
+  public long getTraceId() {
+    return _traceId;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java
similarity index 51%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java
index 4ff56d0b6c..dda7bf7876 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java
@@ -16,10 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.trace;
+package org.apache.pinot.core.util.trace;
 
-public interface Scope extends AutoCloseable {
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public final class TracedThreadFactory implements ThreadFactory {
+
+  private final int _priority;
+  private final boolean _daemon;
+  private final String _nameFormat;
+  private final AtomicInteger _count = new AtomicInteger();
+
+  public TracedThreadFactory(int priority, boolean daemon, String nameFormat) {
+    _priority = priority;
+    _daemon = daemon;
+    _nameFormat = nameFormat;
+  }
 
   @Override
-  void close();
+  public Thread newThread(Runnable task) {
+    Thread thread = new TracedThread(task);
+    thread.setPriority(_priority);
+    thread.setDaemon(_daemon);
+    thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
+    return thread;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java
similarity index 94%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java
index 4ff56d0b6c..40835ab32c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.spi.trace;
 
-public interface Scope extends AutoCloseable {
+public interface Execution extends AutoCloseable {
 
   @Override
   void close();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
index ec2eafd135..ff8a330e46 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
@@ -18,5 +18,5 @@
  */
 package org.apache.pinot.spi.trace;
 
-public interface OperatorExecution extends Scope, ExecutionRecording {
+public interface OperatorExecution extends Execution, ExecutionRecording {
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java
similarity index 84%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java
index 4ff56d0b6c..0105a01ecc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java
@@ -18,8 +18,14 @@
  */
 package org.apache.pinot.spi.trace;
 
-public interface Scope extends AutoCloseable {
+import java.util.Deque;
 
-  @Override
-  void close();
+
+public interface TraceContext {
+
+  long getTraceId();
+
+  void setTraceId(long traceId);
+
+  Deque<ExecutionRecording> getRecordings();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
index 92a77b4985..ee0d89adf2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
@@ -27,4 +27,9 @@ public interface Tracer {
     void register(long requestId);
 
     OperatorExecution startOperatorExecution(Class<?> clazz);
+
+    /**
+     * @return the active execution
+     */
+    ExecutionRecording activeRecording();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to