This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 62b20d9decc Fix collision in trace (#17206)
62b20d9decc is described below
commit 62b20d9deccde4c31cef9f0b4cf6b016010ed408
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 14 12:02:25 2025 -0800
Fix collision in trace (#17206)
---
.../query/executor/ServerQueryExecutorV1Impl.java | 12 ++---
.../pinot/core/util/trace/BuiltInTracer.java | 4 +-
.../apache/pinot/core/util/trace/TraceContext.java | 45 ++++++++---------
.../pinot/core/util/trace/TraceContextTest.java | 57 +++++++++++-----------
.../java/org/apache/pinot/spi/trace/Tracer.java | 7 +--
.../java/org/apache/pinot/spi/trace/Tracing.java | 2 +-
6 files changed, 58 insertions(+), 69 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 5c4e805fe9f..949db6e7df8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -77,9 +77,9 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracer;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Server;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,16 +141,12 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
if (!queryRequest.isEnableTrace()) {
return executeInternal(queryRequest, executorService, streamer);
}
+ Tracer tracer = Tracing.getTracer();
+ tracer.register();
try {
- long requestId = queryRequest.getRequestId();
- // NOTE: Use negative request id as trace id for REALTIME table to
prevent id conflict when the same request
- // hitting both OFFLINE and REALTIME table (hybrid table setup)
- long traceId =
-
TableNameBuilder.isRealtimeTableResource(queryRequest.getTableNameWithType()) ?
-requestId : requestId;
- Tracing.getTracer().register(traceId);
return executeInternal(queryRequest, executorService, streamer);
} finally {
- Tracing.getTracer().unregister();
+ tracer.unregister();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
index bbd5b93bff8..1d4dc4fc734 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
@@ -62,8 +62,8 @@ public class BuiltInTracer implements Tracer {
}
@Override
- public void register(long requestId) {
- TraceContext.register(requestId);
+ public void register() {
+ TraceContext.register();
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
index 0bdaf0da626..eda7b9d4f33 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
@@ -28,15 +28,16 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.spi.utils.JsonUtils;
/**
* The main entry point for servers to record the trace information.
* <p>
- * To enable tracing, the request handler thread should register the requestId
by calling
- * {@link #register(long requestId)}.
+ * To enable tracing, the request handler thread should register the request
by calling {@link #register()}.
* <p>
* To trace the {@link Runnable} or {@link java.util.concurrent.Callable} jobs
the request handler creates and will be
* executed in other threads, use {@link TraceRunnable} or {@link
TraceCallable} instead.
@@ -99,37 +100,33 @@ public final class TraceContext {
* TraceEntry is a wrapper on the trace and the request Id it belongs to.
*/
static class TraceEntry {
- final long _requestId;
+ final long _id;
final Trace _trace;
- TraceEntry(long requestId, Trace trace) {
- _requestId = requestId;
+ TraceEntry(long id, Trace trace) {
+ _id = id;
_trace = trace;
}
}
- private static final ThreadLocal<TraceEntry> TRACE_ENTRY_THREAD_LOCAL = new
ThreadLocal<TraceEntry>() {
- @Override
- protected TraceEntry initialValue() {
- return null;
- }
- };
+ private static final ThreadLocal<TraceEntry> TRACE_ENTRY_THREAD_LOCAL = new
ThreadLocal<>();
- /**
- * Map from request Id to traces associated with the request.
- * <p>Requests may arrive simultaneously, so we need a concurrent map to
manage these requests.
- * <p>Each request may use multiple threads, so the queue should be
thread-safe as well.
- */
+ /// Map from id (unique for each request) to traces associated with the
request.
+ /// Requests may arrive simultaneously, so we need a concurrent map to
manage these requests.
+ /// Each request may use multiple threads, so the queue should be
thread-safe as well.
@VisibleForTesting
static final Map<Long, Queue<Trace>> REQUEST_TO_TRACES_MAP = new
ConcurrentHashMap<>();
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+
/**
* Register a request to the trace.
* <p>Should be called before logging any trace information.
*/
- public static void register(long requestId) {
- REQUEST_TO_TRACES_MAP.put(requestId, new ConcurrentLinkedQueue<Trace>());
- registerThreadToRequest(new TraceEntry(requestId, null));
+ public static void register() {
+ long id = ID_GENERATOR.getAndIncrement();
+ REQUEST_TO_TRACES_MAP.put(id, new ConcurrentLinkedQueue<>());
+ registerThreadToRequest(new TraceEntry(id, null));
}
/**
@@ -137,8 +134,8 @@ public final class TraceContext {
*/
static void registerThreadToRequest(TraceEntry parentTraceEntry) {
Trace trace = new Trace(parentTraceEntry._trace);
- TRACE_ENTRY_THREAD_LOCAL.set(new TraceEntry(parentTraceEntry._requestId,
trace));
- Queue<Trace> traces =
REQUEST_TO_TRACES_MAP.get(parentTraceEntry._requestId);
+ TRACE_ENTRY_THREAD_LOCAL.set(new TraceEntry(parentTraceEntry._id, trace));
+ Queue<Trace> traces = REQUEST_TO_TRACES_MAP.get(parentTraceEntry._id);
if (traces != null) {
traces.add(trace);
}
@@ -150,7 +147,7 @@ public final class TraceContext {
*/
public static void unregister() {
TraceEntry traceEntry = TRACE_ENTRY_THREAD_LOCAL.get();
- REQUEST_TO_TRACES_MAP.remove(traceEntry._requestId);
+ REQUEST_TO_TRACES_MAP.remove(traceEntry._id);
unregisterThreadFromRequest();
}
@@ -189,8 +186,8 @@ public final class TraceContext {
*/
public static String getTraceInfo() {
ArrayNode jsonTraces = JsonUtils.newArrayNode();
- Queue<Trace> traces =
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId);
- if (traces != null && !traces.isEmpty()) {
+ Queue<Trace> traces =
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._id);
+ if (CollectionUtils.isNotEmpty(traces)) {
for (Trace trace : traces) {
jsonTraces.add(trace.toJson());
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
index ba4a4acb0c3..55504d2fb45 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
@@ -26,11 +26,11 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
public class TraceContextTest {
private static final int NUM_CHILDREN_PER_REQUEST = 5000;
@@ -43,24 +43,21 @@ public class TraceContextTest {
ExecutorService executorService = Executors.newCachedThreadPool();
testSingleRequest(executorService, 0);
executorService.shutdown();
- Assert.assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
+ assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
}
@Test
public void testMultipleRequests()
throws Exception {
- final ExecutorService executorService = Executors.newCachedThreadPool();
+ ExecutorService executorService = Executors.newCachedThreadPool();
Future[] futures = new Future[NUM_REQUESTS];
for (int i = 0; i < NUM_REQUESTS; i++) {
- final int requestId = i;
- futures[i] = executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- testSingleRequest(executorService, requestId);
- } catch (Exception e) {
- Assert.fail();
- }
+ int numLogs = i + 1;
+ futures[i] = executorService.submit(() -> {
+ try {
+ testSingleRequest(executorService, numLogs);
+ } catch (Exception e) {
+ fail();
}
});
}
@@ -68,33 +65,32 @@ public class TraceContextTest {
future.get();
}
executorService.shutdown();
- Assert.assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
+ assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
}
- private void testSingleRequest(ExecutorService executorService, final long
requestId)
+ private void testSingleRequest(ExecutorService executorService, int numLogs)
throws Exception {
- Set<String> expectedTraces = new HashSet<>(NUM_CHILDREN_PER_REQUEST + 1);
- Tracing.getTracer().register(requestId);
+ assertNull(TraceContext.getTraceEntry());
+ TraceContext.register();
String key = Integer.toString(RANDOM.nextInt());
int value = RANDOM.nextInt();
+ Set<String> expectedTraces = new HashSet<>();
expectedTraces.add(getTraceString(key, value));
- // Add (requestId + 1) logs
- for (int i = 0; i <= requestId; i++) {
+ for (int i = 0; i < numLogs; i++) {
TraceContext.logInfo(key, value);
}
Future[] futures = new Future[NUM_CHILDREN_PER_REQUEST];
for (int i = 0; i < NUM_CHILDREN_PER_REQUEST; i++) {
- final String chileKey = Integer.toString(RANDOM.nextInt());
- final int childValue = RANDOM.nextInt();
+ String chileKey = Integer.toString(RANDOM.nextInt());
+ int childValue = RANDOM.nextInt();
expectedTraces.add(getTraceString(chileKey, childValue));
futures[i] = executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
- // Add (requestId + 1) logs
- for (int j = 0; j <= requestId; j++) {
+ for (int j = 0; j < numLogs; j++) {
TraceContext.logInfo(chileKey, childValue);
}
}
@@ -105,20 +101,23 @@ public class TraceContextTest {
}
// to check uniqueness of traceIds
Set<String> traceIds = new HashSet<>();
- Queue<TraceContext.Trace> traces =
TraceContext.REQUEST_TO_TRACES_MAP.get(requestId);
- Assert.assertNotNull(traces);
- Assert.assertEquals(traces.size(), NUM_CHILDREN_PER_REQUEST + 1);
+ TraceContext.TraceEntry traceEntry = TraceContext.getTraceEntry();
+ assertNotNull(traceEntry);
+ Queue<TraceContext.Trace> traces =
TraceContext.REQUEST_TO_TRACES_MAP.get(traceEntry._id);
+ assertNotNull(traces);
+ assertEquals(traces.size(), NUM_CHILDREN_PER_REQUEST + 1);
for (TraceContext.Trace trace : traces) {
// Trace Id is not deterministic because it relies on the order of
runJob() getting called
List<TraceContext.Trace.LogEntry> logs = trace._logs;
traceIds.add(trace._traceId);
- Assert.assertEquals(logs.size(), requestId + 1);
+ assertEquals(logs.size(), numLogs);
for (TraceContext.Trace.LogEntry log : logs) {
- Assert.assertTrue(expectedTraces.contains(log.toJson().toString()));
+ assertTrue(expectedTraces.contains(log.toJson().toString()));
}
}
- Assert.assertEquals(traceIds.size(), NUM_CHILDREN_PER_REQUEST + 1);
+ assertEquals(traceIds.size(), NUM_CHILDREN_PER_REQUEST + 1);
TraceContext.unregister();
+ assertNull(TraceContext.getTraceEntry());
}
private static String getTraceString(String key, Object value) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
index 8990a2a3b76..ec6e5420ba7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
@@ -26,12 +26,9 @@ package org.apache.pinot.spi.trace;
public interface Tracer {
/**
- * Registers the requestId on the current thread. This means the request
will be traced.
- * TODO: Consider using string id or random id. Currently different broker
might send query with the same request id.
- *
- * @param requestId the requestId
+ * Registers the current thread for tracing.
*/
- void register(long requestId);
+ void register();
/**
* Detach a trace from the current thread.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 142de13be9a..316b8255164 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -88,7 +88,7 @@ public class Tracing {
static final FallbackTracer INSTANCE = new FallbackTracer();
@Override
- public void register(long requestId) {
+ public void register() {
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]