This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch tracing-spi in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/tracing-spi by this push: new d53bff0574 make active recording available implicitly d53bff0574 is described below commit d53bff05740d4c85fa6042d533275dd11d011d1d Author: richardstartin <rich...@startree.ai> AuthorDate: Tue Apr 5 16:43:48 2022 +0100 make active recording available implicitly --- ...tsTest.java => OperatorExecutionStatsTest.java} | 2 +- .../apache/pinot/core/operator/BaseOperator.java | 7 +---- .../query/scheduler/resources/ResourceManager.java | 9 ++---- .../pinot/core/util/trace/DefaultTracer.java | 36 +++++++++++++++++++--- .../apache/pinot/core/util/trace/TracedThread.java | 31 +++++++++++++++++-- .../pinot/core/util/trace/TracedThreadFactory.java | 27 ++++++++++++++-- .../pinot/spi/trace/{Scope.java => Execution.java} | 2 +- .../apache/pinot/spi/trace/OperatorExecution.java | 2 +- .../spi/trace/{Scope.java => TraceContext.java} | 12 ++++++-- .../java/org/apache/pinot/spi/trace/Tracer.java | 5 +++ 10 files changed, 105 insertions(+), 28 deletions(-) diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java similarity index 99% rename from pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java rename to pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java index 476bea869e..41a4d3f1f3 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java @@ -29,7 +29,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; -public class ExecutionStatsTest { +public class OperatorExecutionStatsTest { private JsonNode _mockBrokerResponse; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java index 369f936e95..bfebb0a95d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java @@ -21,7 +21,6 @@ package org.apache.pinot.core.operator; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; import org.apache.pinot.spi.exception.EarlyTerminationException; -import org.apache.pinot.spi.trace.ExecutionRecording; import org.apache.pinot.spi.trace.OperatorExecution; import org.apache.pinot.spi.trace.Tracing; @@ -37,14 +36,10 @@ public abstract class BaseOperator<T extends Block> implements Operator<T> { throw new EarlyTerminationException(); } try (OperatorExecution execution = Tracing.getTracer().startOperatorExecution(getClass())) { - return getNextBlock(execution); + return getNextBlock(); } } // Make it protected because we should always call nextBlock() protected abstract T getNextBlock(); - - protected T getNextBlock(ExecutionRecording recording) { - return getNextBlock(); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java index d1de9e8c88..c73db98537 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java @@ -21,12 +21,12 @@ package org.apache.pinot.core.query.scheduler.resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant; +import org.apache.pinot.core.util.trace.TracedThreadFactory; import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,15 +80,12 @@ public abstract class ResourceManager { LOGGER.info("Initializing with {} query runner threads and {} worker threads", _numQueryRunnerThreads, _numQueryWorkerThreads); // pqr -> pinot query runner (to give short names) - ThreadFactory queryRunnerFactory = - new ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY).setNameFormat("pqr-%d") - .build(); + ThreadFactory queryRunnerFactory = new TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false, "pqr-%d"); _queryRunners = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory)); // pqw -> pinot query workers - ThreadFactory queryWorkersFactory = - new ThreadFactoryBuilder().setDaemon(false).setPriority(Thread.NORM_PRIORITY).setNameFormat("pqw-%d").build(); + ThreadFactory queryWorkersFactory = new TracedThreadFactory(Thread.NORM_PRIORITY, false, "pqw-%d"); _queryWorkers = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java index 91522f2591..ed53b49580 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java @@ -18,10 +18,14 @@ */ package org.apache.pinot.core.util.trace; +import java.util.ArrayDeque; +import java.util.Deque; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.trace.ExecutionRecording; import org.apache.pinot.spi.trace.FilterType; import org.apache.pinot.spi.trace.OperatorExecution; import org.apache.pinot.spi.trace.Phase; +import org.apache.pinot.spi.trace.TraceContext; import org.apache.pinot.spi.trace.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +34,7 @@ import org.slf4j.LoggerFactory; public class DefaultTracer implements Tracer { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTracer.class); + private static final ThreadLocal<Deque<ExecutionRecording>> STACK = ThreadLocal.withInitial(ArrayDeque::new); private static class NoOpExecution implements OperatorExecution { @@ -72,9 +77,11 @@ public class DefaultTracer implements Tracer { private final long _startTimeMillis = System.currentTimeMillis(); private final Class<?> _operator; + private final Runnable _onClose; - public MillisExecution(Class<?> operator) { + public MillisExecution(Class<?> operator, Runnable onClose) { _operator = operator; + _onClose = onClose; } @Override @@ -84,17 +91,38 @@ public class DefaultTracer implements Tracer { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Time spent in {}: {}", operatorName, duration); } - TraceContext.logTime(operatorName, duration); + org.apache.pinot.core.util.trace.TraceContext.logTime(operatorName, duration); + _onClose.run(); } } @Override public void register(long requestId) { - TraceContext.register(requestId); + org.apache.pinot.core.util.trace.TraceContext.register(requestId); } @Override public OperatorExecution startOperatorExecution(Class<?> operatorClass) { - return TraceContext.traceEnabled() ? new MillisExecution(operatorClass) : NO_OP_SPAN; + if (org.apache.pinot.core.util.trace.TraceContext.traceEnabled()) { + Deque<ExecutionRecording> stack = getStack(); + MillisExecution execution = new MillisExecution(operatorClass, stack::removeLast); + stack.addLast(execution); + return execution; + } + return NO_OP_SPAN; + } + + @Override + public ExecutionRecording activeRecording() { + Deque<ExecutionRecording> stack = getStack(); + return stack.isEmpty() ? NO_OP_SPAN : stack.peekLast(); + } + + private Deque<ExecutionRecording> getStack() { + Thread thread = Thread.currentThread(); + if (thread instanceof TraceContext) { + return ((TraceContext) thread).getRecordings(); + } + return STACK.get(); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java similarity index 55% copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java copy to pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java index 4ff56d0b6c..f3f78a5382 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java @@ -16,10 +16,35 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.trace; +package org.apache.pinot.core.util.trace; -public interface Scope extends AutoCloseable { +import java.util.ArrayDeque; +import java.util.Deque; +import org.apache.pinot.spi.trace.ExecutionRecording; +import org.apache.pinot.spi.trace.TraceContext; + + +final class TracedThread extends Thread implements TraceContext { + + private long _traceId = Long.MIN_VALUE; + private final Deque<ExecutionRecording> _stack = new ArrayDeque<>(); + + public TracedThread(Runnable target) { + super(target); + } + + @Override + public void setTraceId(long traceId) { + _traceId = traceId; + } + + @Override + public Deque<ExecutionRecording> getRecordings() { + return _stack; + } @Override - void close(); + public long getTraceId() { + return _traceId; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java similarity index 51% copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java copy to pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java index 4ff56d0b6c..dda7bf7876 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java @@ -16,10 +16,31 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.trace; +package org.apache.pinot.core.util.trace; -public interface Scope extends AutoCloseable { +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + + +public final class TracedThreadFactory implements ThreadFactory { + + private final int _priority; + private final boolean _daemon; + private final String _nameFormat; + private final AtomicInteger _count = new AtomicInteger(); + + public TracedThreadFactory(int priority, boolean daemon, String nameFormat) { + _priority = priority; + _daemon = daemon; + _nameFormat = nameFormat; + } @Override - void close(); + public Thread newThread(Runnable task) { + Thread thread = new TracedThread(task); + thread.setPriority(_priority); + thread.setDaemon(_daemon); + thread.setName(String.format(_nameFormat, _count.getAndIncrement())); + return thread; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java similarity index 94% copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java copy to pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java index 4ff56d0b6c..40835ab32c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.spi.trace; -public interface Scope extends AutoCloseable { +public interface Execution extends AutoCloseable { @Override void close(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java index ec2eafd135..ff8a330e46 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java @@ -18,5 +18,5 @@ */ package org.apache.pinot.spi.trace; -public interface OperatorExecution extends Scope, ExecutionRecording { +public interface OperatorExecution extends Execution, ExecutionRecording { } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java similarity index 84% rename from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java index 4ff56d0b6c..0105a01ecc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java @@ -18,8 +18,14 @@ */ package org.apache.pinot.spi.trace; -public interface Scope extends AutoCloseable { +import java.util.Deque; - @Override - void close(); + +public interface TraceContext { + + long getTraceId(); + + void setTraceId(long traceId); + + Deque<ExecutionRecording> getRecordings(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java index 92a77b4985..ee0d89adf2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java @@ -27,4 +27,9 @@ public interface Tracer { void register(long requestId); OperatorExecution startOperatorExecution(Class<?> clazz); + + /** + * @return the active execution + */ + ExecutionRecording activeRecording(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org