This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 43e1298e32 allow automatic tracing when a request is sampled by a 
registered tracer (#8629)
43e1298e32 is described below

commit 43e1298e322f2e66adb47c54b8bdfd2b7e57a552
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Wed May 4 09:35:19 2022 +0200

    allow automatic tracing when a request is sampled by a registered tracer 
(#8629)
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 95 +++++++++++-----------
 .../requesthandler/BrokerRequestHandler.java       |  4 +-
 .../requesthandler/GrpcBrokerRequestHandler.java   | 15 ++--
 .../SingleConnectionBrokerRequestHandler.java      | 10 ++-
 .../LiteralOnlyBrokerRequestTest.java              |  8 +-
 ...tStatistics.java => DefaultRequestContext.java} |  4 +-
 ...{RequestStatistics.java => RequestContext.java} |  6 +-
 .../org/apache/pinot/spi/trace/RequestScope.java   |  2 +-
 .../java/org/apache/pinot/spi/trace/Tracer.java    |  2 +-
 9 files changed, 78 insertions(+), 68 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5caa1c89aa..dedd9ebf0e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -76,7 +76,7 @@ import 
org.apache.pinot.spi.config.table.TimestampIndexGranularity;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
@@ -165,12 +165,12 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
   @Override
   public BrokerResponseNative handleRequest(JsonNode request, @Nullable 
RequesterIdentity requesterIdentity,
-      RequestStatistics requestStatistics)
+      RequestContext requestContext)
       throws Exception {
     long requestId = _requestIdGenerator.incrementAndGet();
-    requestStatistics.setBrokerId(_brokerId);
-    requestStatistics.setRequestId(requestId);
-    requestStatistics.setRequestArrivalTimeMillis(System.currentTimeMillis());
+    requestContext.setBrokerId(_brokerId);
+    requestContext.setRequestId(requestId);
+    requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
 
     // First-stage access control to prevent unauthenticated requests from 
using up resources. Secondary table-level
     // check comes later.
@@ -178,7 +178,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (!hasAccess) {
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
       LOGGER.info("Access denied for requestId {}", requestId);
-      requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+      requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
       return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
     }
 
@@ -186,14 +186,14 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (sql == null) {
       throw new BadQueryRequestException("Failed to find 'sql' in the request: 
" + request);
     }
-    return handleRequest(requestId, sql.asText(), request, requesterIdentity, 
requestStatistics);
+    return handleRequest(requestId, sql.asText(), request, requesterIdentity, 
requestContext);
   }
 
   private BrokerResponseNative handleRequest(long requestId, String query, 
JsonNode request,
-      @Nullable RequesterIdentity requesterIdentity, RequestStatistics 
requestStatistics)
+      @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
       throws Exception {
     LOGGER.debug("SQL query for request {}: {}", requestId, query);
-    requestStatistics.setQuery(query);
+    requestContext.setQuery(query);
 
     // Compile the request
     long compilationStartTimeNs = System.nanoTime();
@@ -203,7 +203,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
-      requestStatistics.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
     }
 
@@ -219,7 +219,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
           // EXPLAIN PLAN results to show that query is evaluated exclusively 
by Broker.
           return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
         }
-        return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, 
requestStatistics);
+        return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, 
requestContext);
       } catch (Exception e) {
         // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
         LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
@@ -228,18 +228,18 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
 
     try {
-      handleSubquery(pinotQuery, requestId, request, requesterIdentity, 
requestStatistics);
+      handleSubquery(pinotQuery, requestId, request, requesterIdentity, 
requestContext);
     } catch (Exception e) {
       LOGGER.info("Caught exception while handling the subquery in request {}: 
{}, {}", requestId, query,
           e.getMessage());
-      
requestStatistics.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+      requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     }
 
     String tableName = 
getActualTableName(pinotQuery.getDataSource().getTableName());
     setTableName(serverBrokerRequest, tableName);
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    requestStatistics.setTableName(rawTableName);
+    requestContext.setTableName(rawTableName);
 
     try {
       boolean isCaseInsensitive = _tableCache.isCaseInsensitive();
@@ -252,7 +252,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       if (e instanceof BadQueryRequestException) {
         LOGGER.info("Caught exception while checking column names in request, 
{}: {}, {}", requestId, query,
             e.getMessage());
-        
requestStatistics.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+        requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
         _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
         return new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e));
       }
@@ -279,7 +279,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (!hasTableAccess) {
       _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
       LOGGER.info("Access denied for request {}: {}, table: {}", requestId, 
query, tableName);
-      requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+      requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
       return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
     }
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
@@ -320,11 +320,11 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       // No table matches the request
       if (realtimeTableConfig == null && offlineTableConfig == null) {
         LOGGER.info("Table not found for request {}: {}", requestId, query);
-        
requestStatistics.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
+        
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
         return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
       }
       LOGGER.info("No table matches for request {}: {}", requestId, query);
-      
requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
+      
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 
1);
       return BrokerResponseNative.NO_TABLE_RESULT;
     }
@@ -339,7 +339,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       String errorMessage =
           String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
       LOGGER.info(errorMessage);
-      
requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+      requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
       _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
     }
@@ -349,7 +349,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       validateRequest(pinotQuery, _queryResponseLimit);
     } catch (Exception e) {
       LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
-      
requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
       _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
     }
@@ -373,9 +373,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       handleExpressionOverride(realtimePinotQuery, 
_tableCache.getExpressionOverrideMap(realtimeTableName));
       handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
       _queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, 
schema);
-      requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
-      
requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
-      
requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+      requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
+      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
+      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
     } else if (offlineTableName != null) {
       // OFFLINE only
       setTableName(serverBrokerRequest, offlineTableName);
@@ -383,8 +383,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       handleTimestampIndexOverride(pinotQuery, offlineTableConfig);
       _queryOptimizer.optimize(pinotQuery, offlineTableConfig, schema);
       offlineBrokerRequest = serverBrokerRequest;
-      requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
-      
requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
+      requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
+      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
     } else {
       // REALTIME only
       setTableName(serverBrokerRequest, realtimeTableName);
@@ -392,8 +392,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       handleTimestampIndexOverride(pinotQuery, realtimeTableConfig);
       _queryOptimizer.optimize(pinotQuery, realtimeTableConfig, schema);
       realtimeBrokerRequest = serverBrokerRequest;
-      requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
-      
requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+      requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
+      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
     }
 
     // Check if response can be send without server query evaluation.
@@ -417,7 +417,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
       // Send empty response since we don't need to evaluate either offline or 
realtime request.
       BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
-      logBrokerResponse(requestId, query, requestStatistics, brokerRequest, 0, 
new ServerStats(), brokerResponse,
+      logBrokerResponse(requestId, query, requestContext, brokerRequest, 0, 
new ServerStats(), brokerResponse,
           System.nanoTime());
 
       return brokerResponse;
@@ -471,7 +471,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       }
     }
     int numUnavailableSegments = unavailableSegments.size();
-    requestStatistics.setNumUnavailableSegments(numUnavailableSegments);
+    requestContext.setNumUnavailableSegments(numUnavailableSegments);
 
     List<ProcessingException> exceptions = new ArrayList<>();
     if (numUnavailableSegments > 0) {
@@ -530,7 +530,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
     BrokerResponseNative brokerResponse =
         processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, 
offlineBrokerRequest, offlineRoutingTable,
-            realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, 
serverStats, requestStatistics);
+            realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, 
serverStats, requestContext);
     brokerResponse.setExceptions(exceptions);
     long executionEndTimeNs = System.nanoTime();
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.QUERY_EXECUTION,
@@ -544,10 +544,10 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     // Set total query processing time
     long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - 
compilationStartTimeNs);
     brokerResponse.setTimeUsedMs(totalTimeMs);
-    requestStatistics.setQueryProcessingTime(totalTimeMs);
-    augmentStatistics(requestStatistics, brokerResponse);
+    requestContext.setQueryProcessingTime(totalTimeMs);
+    augmentStatistics(requestContext, brokerResponse);
 
-    logBrokerResponse(requestId, query, requestStatistics, brokerRequest, 
numUnavailableSegments, serverStats,
+    logBrokerResponse(requestId, query, requestContext, brokerRequest, 
numUnavailableSegments, serverStats,
         brokerResponse, totalTimeMs);
     return brokerResponse;
   }
@@ -627,7 +627,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     return TRUE.equals(brokerRequest.getPinotQuery().getFilterExpression());
   }
 
-  private void logBrokerResponse(long requestId, String query, 
RequestStatistics requestStatistics,
+  private void logBrokerResponse(long requestId, String query, RequestContext 
requestContext,
       BrokerRequest brokerRequest, int numUnavailableSegments, ServerStats 
serverStats,
       BrokerResponseNative brokerResponse, long totalTimeMs) {
     LOGGER.debug("Broker Response: {}", brokerResponse);
@@ -649,7 +649,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
           brokerResponse.getNumConsumingSegmentsQueried(), 
numUnavailableSegments,
           brokerResponse.getMinConsumingFreshnessTimeMs(), 
brokerResponse.getNumServersResponded(),
           brokerResponse.getNumServersQueried(), 
brokerResponse.isNumGroupsLimitReached(),
-          requestStatistics.getReduceTimeMillis(), 
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
+          requestContext.getReduceTimeMillis(), 
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
           brokerResponse.getOfflineTotalCpuTimeNs(), 
brokerResponse.getOfflineThreadCpuTimeNs(),
           brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
           brokerResponse.getOfflineResponseSerializationCpuTimeNs(), 
brokerResponse.getRealtimeTotalCpuTimeNs(),
@@ -687,11 +687,11 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
    * <p>Currently only supports subquery within the filter.
    */
   private void handleSubquery(PinotQuery pinotQuery, long requestId, JsonNode 
jsonRequest,
-      @Nullable RequesterIdentity requesterIdentity, RequestStatistics 
requestStatistics)
+      @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
       throws Exception {
     Expression filterExpression = pinotQuery.getFilterExpression();
     if (filterExpression != null) {
-      handleSubquery(filterExpression, requestId, jsonRequest, 
requesterIdentity, requestStatistics);
+      handleSubquery(filterExpression, requestId, jsonRequest, 
requesterIdentity, requestContext);
     }
   }
 
@@ -703,7 +703,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
    * IN_ID_SET transform function.
    */
   private void handleSubquery(Expression expression, long requestId, JsonNode 
jsonRequest,
-      @Nullable RequesterIdentity requesterIdentity, RequestStatistics 
requestStatistics)
+      @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
       throws Exception {
     Function function = expression.getFunctionCall();
     if (function == null) {
@@ -716,7 +716,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       Preconditions.checkState(subqueryLiteral != null, "Second argument of 
IN_SUBQUERY must be a literal (subquery)");
       String subquery = subqueryLiteral.getStringValue();
       BrokerResponseNative response =
-          handleRequest(requestId, subquery, jsonRequest, requesterIdentity, 
requestStatistics);
+          handleRequest(requestId, subquery, jsonRequest, requesterIdentity, 
requestContext);
       if (response.getExceptionsSize() != 0) {
         throw new RuntimeException("Caught exception while executing subquery: 
" + subquery);
       }
@@ -725,7 +725,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       operands.set(1, RequestUtils.getLiteralExpression(serializedIdSet));
     } else {
       for (Expression operand : operands) {
-        handleSubquery(operand, requestId, jsonRequest, requesterIdentity, 
requestStatistics);
+        handleSubquery(operand, requestId, jsonRequest, requesterIdentity, 
requestContext);
       }
     }
   }
@@ -1097,7 +1097,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
    * Processes the literal only query.
    */
   private BrokerResponseNative processLiteralOnlyQuery(PinotQuery pinotQuery, 
long compilationStartTimeNs,
-      RequestStatistics requestStatistics) {
+      RequestContext requestContext) {
     BrokerResponseNative brokerResponse = new BrokerResponseNative();
     List<String> columnNames = new ArrayList<>();
     List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>();
@@ -1114,8 +1114,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
     long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
compilationStartTimeNs);
     brokerResponse.setTimeUsedMs(totalTimeMs);
-    requestStatistics.setQueryProcessingTime(totalTimeMs);
-    augmentStatistics(requestStatistics, brokerResponse);
+    requestContext.setQueryProcessingTime(totalTimeMs);
+    augmentStatistics(requestContext, brokerResponse);
     return brokerResponse;
   }
 
@@ -1516,13 +1516,12 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
    * Processes the optimized broker requests for both OFFLINE and REALTIME 
table.
    */
   protected abstract BrokerResponseNative processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
-      BrokerRequest serverBrokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, 
@Nullable BrokerRequest realtimeBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long 
timeoutMs, ServerStats serverStats,
-      RequestStatistics requestStatistics)
+      BrokerRequest serverBrokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest, @Nullable Map<ServerInstance,
+      List<String>> offlineRoutingTable, @Nullable BrokerRequest 
realtimeBrokerRequest, @Nullable Map<ServerInstance,
+      List<String>> realtimeRoutingTable, long timeoutMs, ServerStats 
serverStats, RequestContext requestContext)
       throws Exception;
 
-  private static void augmentStatistics(RequestStatistics statistics, 
BrokerResponse response) {
+  private static void augmentStatistics(RequestContext statistics, 
BrokerResponse response) {
     statistics.setTotalDocs(response.getTotalDocs());
     statistics.setNumDocsScanned(response.getNumDocsScanned());
     
statistics.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 786aa8b324..93591dee71 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -23,7 +23,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.common.response.BrokerResponse;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
 
 
 @ThreadSafe
@@ -34,6 +34,6 @@ public interface BrokerRequestHandler {
   void shutDown();
 
   BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity 
requesterIdentity,
-      RequestStatistics requestStatistics)
+      RequestContext requestContext)
       throws Exception;
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index e2c7cf40b3..468add98c2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -41,7 +41,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
 
 
 /**
@@ -81,18 +81,20 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   protected BrokerResponseNative processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
       BrokerRequest serverBrokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest, @Nullable Map<ServerInstance,
       List<String>> offlineRoutingTable, @Nullable BrokerRequest 
realtimeBrokerRequest, @Nullable Map<ServerInstance,
-      List<String>> realtimeRoutingTable, long timeoutMs, ServerStats 
serverStats, RequestStatistics requestStatistics)
+      List<String>> realtimeRoutingTable, long timeoutMs, ServerStats 
serverStats, RequestContext requestContext)
       throws Exception {
     // TODO: Support failure detection
     assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
     Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = 
new HashMap<>();
     if (offlineBrokerRequest != null) {
       assert offlineRoutingTable != null;
-      sendRequest(TableType.OFFLINE, offlineBrokerRequest, 
offlineRoutingTable, responseMap);
+      sendRequest(TableType.OFFLINE, offlineBrokerRequest, 
offlineRoutingTable, responseMap,
+          requestContext.isSampledRequest());
     }
     if (realtimeBrokerRequest != null) {
       assert realtimeRoutingTable != null;
-      sendRequest(TableType.REALTIME, realtimeBrokerRequest, 
realtimeRoutingTable, responseMap);
+      sendRequest(TableType.REALTIME, realtimeBrokerRequest, 
realtimeRoutingTable, responseMap,
+          requestContext.isSampledRequest());
     }
     return 
_streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, 
responseMap, timeoutMs,
         _brokerMetrics);
@@ -103,7 +105,7 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
    */
   private void sendRequest(TableType tableType, BrokerRequest brokerRequest,
       Map<ServerInstance, List<String>> routingTable,
-      Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap) 
{
+      Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, 
boolean trace) {
     for (Map.Entry<ServerInstance, List<String>> routingEntry : 
routingTable.entrySet()) {
       ServerInstance serverInstance = routingEntry.getKey();
       List<String> segments = routingEntry.getValue();
@@ -111,7 +113,8 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       int port = serverInstance.getGrpcPort();
       // TODO: enable throttling on per host bases.
       Iterator<Server.ServerResponse> streamingResponse = 
_streamingQueryClient.submit(serverHost, port,
-          new 
GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true));
+          new 
GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true)
+              .setEnableTrace(trace));
       responseMap.put(serverInstance.toServerRoutingInstance(tableType, 
ServerInstance.RoutingType.GRPC),
           streamingResponse);
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index f16c12f893..2e399211cb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -49,7 +49,8 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.ServerResponse;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,9 +96,12 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
       BrokerRequest serverBrokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest,
       @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, 
@Nullable BrokerRequest realtimeBrokerRequest,
       @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long 
timeoutMs, ServerStats serverStats,
-      RequestStatistics requestStatistics)
+      RequestContext requestContext)
       throws Exception {
     assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
+    if (requestContext.isSampledRequest()) {
+      
serverBrokerRequest.getPinotQuery().putToQueryOptions(CommonConstants.Broker.Request.TRACE,
 "true");
+    }
 
     String rawTableName = 
TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName());
     long scatterGatherStartTimeNs = System.nanoTime();
@@ -134,7 +138,7 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
         _brokerReduceService.reduceOnDataTable(originalBrokerRequest, 
serverBrokerRequest, dataTableMap,
             reduceTimeOutMs, _brokerMetrics);
     final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
-    requestStatistics.setReduceTimeNanos(reduceTimeNanos);
+    requestContext.setReduceTimeNanos(reduceTimeNanos);
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, 
reduceTimeNanos);
 
     brokerResponse.setNumServersQueried(numServersQueried);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index bee32f9432..704d397c35 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -129,7 +129,7 @@ public class LiteralOnlyBrokerRequestTest {
     RANDOM.nextBytes(randBytes);
     String ranStr = BytesUtils.toHexString(randBytes);
     JsonNode request = new 
ObjectMapper().readTree(String.format("{\"sql\":\"SELECT %d, '%s'\"}", randNum, 
ranStr));
-    RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
+    RequestContext requestStats = Tracing.getTracer().createRequestScope();
     BrokerResponseNative brokerResponse = 
requestHandler.handleRequest(request, null, requestStats);
     
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0),
 String.format("%d", randNum));
     
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnDataType(0),
@@ -154,7 +154,7 @@ public class LiteralOnlyBrokerRequestTest {
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = new ObjectMapper().readTree(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 
'yyyy-MM-dd z') as firstDayOf2020\"}");
-    RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
+    RequestContext requestStats = Tracing.getTracer().createRequestScope();
     BrokerResponseNative brokerResponse = 
requestHandler.handleRequest(request, null, requestStats);
     long currentTsMax = System.currentTimeMillis();
     
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0),
 "currentTs");
@@ -225,7 +225,7 @@ public class LiteralOnlyBrokerRequestTest {
     ObjectMapper objectMapper = new ObjectMapper();
     // Test 1: select constant
     JsonNode request = objectMapper.readTree("{\"sql\":\"EXPLAIN PLAN FOR 
SELECT 1.5, 'test'\"}");
-    RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
+    RequestContext requestStats = Tracing.getTracer().createRequestScope();
     BrokerResponseNative brokerResponse = 
requestHandler.handleRequest(request, null, requestStats);
 
     checkExplainResultSchema(brokerResponse.getResultTable().getDataSchema(),
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
similarity index 98%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 649688b0f2..60e1a1cca8 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
  * This object can be used to publish the query processing statistics to a 
stream for
  * post-processing at a finer level than metrics.
  */
-public class DefaultRequestStatistics implements RequestScope {
+public class DefaultRequestContext implements RequestScope {
 
   private static final String DEFAULT_TABLE_NAME = "NotYetParsed";
 
@@ -64,7 +64,7 @@ public class DefaultRequestStatistics implements RequestScope 
{
   private FanoutType _fanoutType;
   private int _numUnavailableSegments;
 
-  public DefaultRequestStatistics() {
+  public DefaultRequestContext() {
   }
 
   @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
similarity index 97%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index 9f47d348b0..89f768608b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.spi.trace;
 
-public interface RequestStatistics {
+public interface RequestContext {
   long getOfflineSystemActivitiesCpuTimeNs();
 
   void setOfflineSystemActivitiesCpuTimeNs(long 
offlineSystemActivitiesCpuTimeNs);
@@ -51,6 +51,10 @@ public interface RequestStatistics {
 
   long getRequestId();
 
+  default boolean isSampledRequest() {
+    return false;
+  }
+
   long getRequestArrivalTimeMillis();
 
   long getReduceTimeMillis();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
index 450ad20e23..3d1c7b052f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
@@ -22,5 +22,5 @@ package org.apache.pinot.spi.trace;
  * A scope wrapping an end to end synchronous pinot request.
  * Can be extended by a custom tracer to meter request latency.
  */
-public interface RequestScope extends Scope, RequestStatistics {
+public interface RequestScope extends Scope, RequestContext {
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
index bc448d0461..bf07b6f828 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
@@ -48,7 +48,7 @@ public interface Tracer {
      * @return the request record
      */
     default RequestScope createRequestScope() {
-        return new DefaultRequestStatistics();
+        return new DefaultRequestContext();
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to