This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 62b20d9decc Fix collision in trace (#17206)
62b20d9decc is described below

commit 62b20d9deccde4c31cef9f0b4cf6b016010ed408
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 14 12:02:25 2025 -0800

    Fix collision in trace (#17206)
---
 .../query/executor/ServerQueryExecutorV1Impl.java  | 12 ++---
 .../pinot/core/util/trace/BuiltInTracer.java       |  4 +-
 .../apache/pinot/core/util/trace/TraceContext.java | 45 ++++++++---------
 .../pinot/core/util/trace/TraceContextTest.java    | 57 +++++++++++-----------
 .../java/org/apache/pinot/spi/trace/Tracer.java    |  7 +--
 .../java/org/apache/pinot/spi/trace/Tracing.java   |  2 +-
 6 files changed, 58 insertions(+), 69 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 5c4e805fe9f..949db6e7df8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -77,9 +77,9 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracer;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,16 +141,12 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     if (!queryRequest.isEnableTrace()) {
       return executeInternal(queryRequest, executorService, streamer);
     }
+    Tracer tracer = Tracing.getTracer();
+    tracer.register();
     try {
-      long requestId = queryRequest.getRequestId();
-      // NOTE: Use negative request id as trace id for REALTIME table to 
prevent id conflict when the same request
-      //       hitting both OFFLINE and REALTIME table (hybrid table setup)
-      long traceId =
-          
TableNameBuilder.isRealtimeTableResource(queryRequest.getTableNameWithType()) ? 
-requestId : requestId;
-      Tracing.getTracer().register(traceId);
       return executeInternal(queryRequest, executorService, streamer);
     } finally {
-      Tracing.getTracer().unregister();
+      tracer.unregister();
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
index bbd5b93bff8..1d4dc4fc734 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java
@@ -62,8 +62,8 @@ public class BuiltInTracer implements Tracer {
   }
 
   @Override
-  public void register(long requestId) {
-    TraceContext.register(requestId);
+  public void register() {
+    TraceContext.register();
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
index 0bdaf0da626..eda7b9d4f33 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
@@ -28,15 +28,16 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
 /**
  * The main entry point for servers to record the trace information.
  * <p>
- * To enable tracing, the request handler thread should register the requestId 
by calling
- * {@link #register(long requestId)}.
+ * To enable tracing, the request handler thread should register the request 
by calling {@link #register()}.
  * <p>
  * To trace the {@link Runnable} or {@link java.util.concurrent.Callable} jobs 
the request handler creates and will be
  * executed in other threads, use {@link TraceRunnable} or {@link 
TraceCallable} instead.
@@ -99,37 +100,33 @@ public final class TraceContext {
    * TraceEntry is a wrapper on the trace and the request Id it belongs to.
    */
   static class TraceEntry {
-    final long _requestId;
+    final long _id;
     final Trace _trace;
 
-    TraceEntry(long requestId, Trace trace) {
-      _requestId = requestId;
+    TraceEntry(long id, Trace trace) {
+      _id = id;
       _trace = trace;
     }
   }
 
-  private static final ThreadLocal<TraceEntry> TRACE_ENTRY_THREAD_LOCAL = new 
ThreadLocal<TraceEntry>() {
-    @Override
-    protected TraceEntry initialValue() {
-      return null;
-    }
-  };
+  private static final ThreadLocal<TraceEntry> TRACE_ENTRY_THREAD_LOCAL = new 
ThreadLocal<>();
 
-  /**
-   * Map from request Id to traces associated with the request.
-   * <p>Requests may arrive simultaneously, so we need a concurrent map to 
manage these requests.
-   * <p>Each request may use multiple threads, so the queue should be 
thread-safe as well.
-   */
+  /// Map from id (unique for each request) to traces associated with the 
request.
+  /// Requests may arrive simultaneously, so we need a concurrent map to 
manage these requests.
+  /// Each request may use multiple threads, so the queue should be 
thread-safe as well.
   @VisibleForTesting
   static final Map<Long, Queue<Trace>> REQUEST_TO_TRACES_MAP = new 
ConcurrentHashMap<>();
 
+  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+
   /**
    * Register a request to the trace.
    * <p>Should be called before logging any trace information.
    */
-  public static void register(long requestId) {
-    REQUEST_TO_TRACES_MAP.put(requestId, new ConcurrentLinkedQueue<Trace>());
-    registerThreadToRequest(new TraceEntry(requestId, null));
+  public static void register() {
+    long id = ID_GENERATOR.getAndIncrement();
+    REQUEST_TO_TRACES_MAP.put(id, new ConcurrentLinkedQueue<>());
+    registerThreadToRequest(new TraceEntry(id, null));
   }
 
   /**
@@ -137,8 +134,8 @@ public final class TraceContext {
    */
   static void registerThreadToRequest(TraceEntry parentTraceEntry) {
     Trace trace = new Trace(parentTraceEntry._trace);
-    TRACE_ENTRY_THREAD_LOCAL.set(new TraceEntry(parentTraceEntry._requestId, 
trace));
-    Queue<Trace> traces = 
REQUEST_TO_TRACES_MAP.get(parentTraceEntry._requestId);
+    TRACE_ENTRY_THREAD_LOCAL.set(new TraceEntry(parentTraceEntry._id, trace));
+    Queue<Trace> traces = REQUEST_TO_TRACES_MAP.get(parentTraceEntry._id);
     if (traces != null) {
       traces.add(trace);
     }
@@ -150,7 +147,7 @@ public final class TraceContext {
    */
   public static void unregister() {
     TraceEntry traceEntry = TRACE_ENTRY_THREAD_LOCAL.get();
-    REQUEST_TO_TRACES_MAP.remove(traceEntry._requestId);
+    REQUEST_TO_TRACES_MAP.remove(traceEntry._id);
     unregisterThreadFromRequest();
   }
 
@@ -189,8 +186,8 @@ public final class TraceContext {
    */
   public static String getTraceInfo() {
     ArrayNode jsonTraces = JsonUtils.newArrayNode();
-    Queue<Trace> traces = 
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId);
-    if (traces != null && !traces.isEmpty()) {
+    Queue<Trace> traces = 
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._id);
+    if (CollectionUtils.isNotEmpty(traces)) {
       for (Trace trace : traces) {
         jsonTraces.add(trace.toJson());
       }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
index ba4a4acb0c3..55504d2fb45 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/TraceContextTest.java
@@ -26,11 +26,11 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
+
 
 public class TraceContextTest {
   private static final int NUM_CHILDREN_PER_REQUEST = 5000;
@@ -43,24 +43,21 @@ public class TraceContextTest {
     ExecutorService executorService = Executors.newCachedThreadPool();
     testSingleRequest(executorService, 0);
     executorService.shutdown();
-    Assert.assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
+    assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
   }
 
   @Test
   public void testMultipleRequests()
       throws Exception {
-    final ExecutorService executorService = Executors.newCachedThreadPool();
+    ExecutorService executorService = Executors.newCachedThreadPool();
     Future[] futures = new Future[NUM_REQUESTS];
     for (int i = 0; i < NUM_REQUESTS; i++) {
-      final int requestId = i;
-      futures[i] = executorService.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            testSingleRequest(executorService, requestId);
-          } catch (Exception e) {
-            Assert.fail();
-          }
+      int numLogs = i + 1;
+      futures[i] = executorService.submit(() -> {
+        try {
+          testSingleRequest(executorService, numLogs);
+        } catch (Exception e) {
+          fail();
         }
       });
     }
@@ -68,33 +65,32 @@ public class TraceContextTest {
       future.get();
     }
     executorService.shutdown();
-    Assert.assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
+    assertTrue(TraceContext.REQUEST_TO_TRACES_MAP.isEmpty());
   }
 
-  private void testSingleRequest(ExecutorService executorService, final long 
requestId)
+  private void testSingleRequest(ExecutorService executorService, int numLogs)
       throws Exception {
-    Set<String> expectedTraces = new HashSet<>(NUM_CHILDREN_PER_REQUEST + 1);
-    Tracing.getTracer().register(requestId);
+    assertNull(TraceContext.getTraceEntry());
+    TraceContext.register();
     String key = Integer.toString(RANDOM.nextInt());
     int value = RANDOM.nextInt();
+    Set<String> expectedTraces = new HashSet<>();
     expectedTraces.add(getTraceString(key, value));
 
-    // Add (requestId + 1) logs
-    for (int i = 0; i <= requestId; i++) {
+    for (int i = 0; i < numLogs; i++) {
       TraceContext.logInfo(key, value);
     }
 
     Future[] futures = new Future[NUM_CHILDREN_PER_REQUEST];
     for (int i = 0; i < NUM_CHILDREN_PER_REQUEST; i++) {
-      final String chileKey = Integer.toString(RANDOM.nextInt());
-      final int childValue = RANDOM.nextInt();
+      String chileKey = Integer.toString(RANDOM.nextInt());
+      int childValue = RANDOM.nextInt();
       expectedTraces.add(getTraceString(chileKey, childValue));
 
       futures[i] = executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          // Add (requestId + 1) logs
-          for (int j = 0; j <= requestId; j++) {
+          for (int j = 0; j < numLogs; j++) {
             TraceContext.logInfo(chileKey, childValue);
           }
         }
@@ -105,20 +101,23 @@ public class TraceContextTest {
     }
     // to check uniqueness of traceIds
     Set<String> traceIds = new HashSet<>();
-    Queue<TraceContext.Trace> traces = 
TraceContext.REQUEST_TO_TRACES_MAP.get(requestId);
-    Assert.assertNotNull(traces);
-    Assert.assertEquals(traces.size(), NUM_CHILDREN_PER_REQUEST + 1);
+    TraceContext.TraceEntry traceEntry = TraceContext.getTraceEntry();
+    assertNotNull(traceEntry);
+    Queue<TraceContext.Trace> traces = 
TraceContext.REQUEST_TO_TRACES_MAP.get(traceEntry._id);
+    assertNotNull(traces);
+    assertEquals(traces.size(), NUM_CHILDREN_PER_REQUEST + 1);
     for (TraceContext.Trace trace : traces) {
       // Trace Id is not deterministic because it relies on the order of 
runJob() getting called
       List<TraceContext.Trace.LogEntry> logs = trace._logs;
       traceIds.add(trace._traceId);
-      Assert.assertEquals(logs.size(), requestId + 1);
+      assertEquals(logs.size(), numLogs);
       for (TraceContext.Trace.LogEntry log : logs) {
-        Assert.assertTrue(expectedTraces.contains(log.toJson().toString()));
+        assertTrue(expectedTraces.contains(log.toJson().toString()));
       }
     }
-    Assert.assertEquals(traceIds.size(), NUM_CHILDREN_PER_REQUEST + 1);
+    assertEquals(traceIds.size(), NUM_CHILDREN_PER_REQUEST + 1);
     TraceContext.unregister();
+    assertNull(TraceContext.getTraceEntry());
   }
 
   private static String getTraceString(String key, Object value) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
index 8990a2a3b76..ec6e5420ba7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
@@ -26,12 +26,9 @@ package org.apache.pinot.spi.trace;
 public interface Tracer {
 
   /**
-   * Registers the requestId on the current thread. This means the request 
will be traced.
-   * TODO: Consider using string id or random id. Currently different broker 
might send query with the same request id.
-   *
-   * @param requestId the requestId
+   * Registers the current thread for tracing.
    */
-  void register(long requestId);
+  void register();
 
   /**
    * Detach a trace from the current thread.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 142de13be9a..316b8255164 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -88,7 +88,7 @@ public class Tracing {
     static final FallbackTracer INSTANCE = new FallbackTracer();
 
     @Override
-    public void register(long requestId) {
+    public void register() {
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to