This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 43e1298e32 allow automatic tracing when a request is sampled by a registered tracer (#8629) 43e1298e32 is described below commit 43e1298e322f2e66adb47c54b8bdfd2b7e57a552 Author: Richard Startin <rich...@startree.ai> AuthorDate: Wed May 4 09:35:19 2022 +0200 allow automatic tracing when a request is sampled by a registered tracer (#8629) --- .../requesthandler/BaseBrokerRequestHandler.java | 95 +++++++++++----------- .../requesthandler/BrokerRequestHandler.java | 4 +- .../requesthandler/GrpcBrokerRequestHandler.java | 15 ++-- .../SingleConnectionBrokerRequestHandler.java | 10 ++- .../LiteralOnlyBrokerRequestTest.java | 8 +- ...tStatistics.java => DefaultRequestContext.java} | 4 +- ...{RequestStatistics.java => RequestContext.java} | 6 +- .../org/apache/pinot/spi/trace/RequestScope.java | 2 +- .../java/org/apache/pinot/spi/trace/Tracer.java | 2 +- 9 files changed, 78 insertions(+), 68 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 5caa1c89aa..dedd9ebf0e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -76,7 +76,7 @@ import org.apache.pinot.spi.config.table.TimestampIndexGranularity; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.BadQueryRequestException; -import org.apache.pinot.spi.trace.RequestStatistics; +import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker; @@ -165,12 +165,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { @Override public BrokerResponseNative handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, - RequestStatistics requestStatistics) + RequestContext requestContext) throws Exception { long requestId = _requestIdGenerator.incrementAndGet(); - requestStatistics.setBrokerId(_brokerId); - requestStatistics.setRequestId(requestId); - requestStatistics.setRequestArrivalTimeMillis(System.currentTimeMillis()); + requestContext.setBrokerId(_brokerId); + requestContext.setRequestId(requestId); + requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis()); // First-stage access control to prevent unauthenticated requests from using up resources. Secondary table-level // check comes later. @@ -178,7 +178,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (!hasAccess) { _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); LOGGER.info("Access denied for requestId {}", requestId); - requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); + requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR); } @@ -186,14 +186,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (sql == null) { throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); } - return handleRequest(requestId, sql.asText(), request, requesterIdentity, requestStatistics); + return handleRequest(requestId, sql.asText(), request, requesterIdentity, requestContext); } private BrokerResponseNative handleRequest(long requestId, String query, JsonNode request, - @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics) + @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { LOGGER.debug("SQL query for request {}: {}", requestId, query); - requestStatistics.setQuery(query); + requestContext.setQuery(query); // Compile the request long compilationStartTimeNs = System.nanoTime(); @@ -203,7 +203,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } catch (Exception e) { LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); - requestStatistics.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); } @@ -219,7 +219,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; } - return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, requestStatistics); + return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, requestContext); } catch (Exception e) { // TODO: refine the exceptions here to early termination the queries won't requires to send to servers. LOGGER.warn("Unable to execute literal request {}: {} at broker, fallback to server query. {}", requestId, @@ -228,18 +228,18 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } try { - handleSubquery(pinotQuery, requestId, request, requesterIdentity, requestStatistics); + handleSubquery(pinotQuery, requestId, request, requesterIdentity, requestContext); } catch (Exception e) { LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, e.getMessage()); - requestStatistics.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); + requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } String tableName = getActualTableName(pinotQuery.getDataSource().getTableName()); setTableName(serverBrokerRequest, tableName); String rawTableName = TableNameBuilder.extractRawTableName(tableName); - requestStatistics.setTableName(rawTableName); + requestContext.setTableName(rawTableName); try { boolean isCaseInsensitive = _tableCache.isCaseInsensitive(); @@ -252,7 +252,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (e instanceof BadQueryRequestException) { LOGGER.info("Caught exception while checking column names in request, {}: {}, {}", requestId, query, e.getMessage()); - requestStatistics.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE); + requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); return new BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR, e)); } @@ -279,7 +279,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (!hasTableAccess) { _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); LOGGER.info("Access denied for request {}: {}, table: {}", requestId, query, tableName); - requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); + requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR); } _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION, @@ -320,11 +320,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // No table matches the request if (realtimeTableConfig == null && offlineTableConfig == null) { LOGGER.info("Table not found for request {}: {}", requestId, query); - requestStatistics.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE); + requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE); return BrokerResponseNative.TABLE_DOES_NOT_EXIST; } LOGGER.info("No table matches for request {}: {}", requestId, query); - requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE); + requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1); return BrokerResponseNative.NO_TABLE_RESULT; } @@ -339,7 +339,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { String errorMessage = String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); LOGGER.info(errorMessage); - requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); + requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } @@ -349,7 +349,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { validateRequest(pinotQuery, _queryResponseLimit); } catch (Exception e) { LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); - requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); } @@ -373,9 +373,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { handleExpressionOverride(realtimePinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName)); handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig); _queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, schema); - requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID); - requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName)); - requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName)); + requestContext.setFanoutType(RequestContext.FanoutType.HYBRID); + requestContext.setOfflineServerTenant(getServerTenant(offlineTableName)); + requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName)); } else if (offlineTableName != null) { // OFFLINE only setTableName(serverBrokerRequest, offlineTableName); @@ -383,8 +383,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { handleTimestampIndexOverride(pinotQuery, offlineTableConfig); _queryOptimizer.optimize(pinotQuery, offlineTableConfig, schema); offlineBrokerRequest = serverBrokerRequest; - requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE); - requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName)); + requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE); + requestContext.setOfflineServerTenant(getServerTenant(offlineTableName)); } else { // REALTIME only setTableName(serverBrokerRequest, realtimeTableName); @@ -392,8 +392,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { handleTimestampIndexOverride(pinotQuery, realtimeTableConfig); _queryOptimizer.optimize(pinotQuery, realtimeTableConfig, schema); realtimeBrokerRequest = serverBrokerRequest; - requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME); - requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName)); + requestContext.setFanoutType(RequestContext.FanoutType.REALTIME); + requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName)); } // Check if response can be send without server query evaluation. @@ -417,7 +417,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Send empty response since we don't need to evaluate either offline or realtime request. BrokerResponseNative brokerResponse = BrokerResponseNative.empty(); - logBrokerResponse(requestId, query, requestStatistics, brokerRequest, 0, new ServerStats(), brokerResponse, + logBrokerResponse(requestId, query, requestContext, brokerRequest, 0, new ServerStats(), brokerResponse, System.nanoTime()); return brokerResponse; @@ -471,7 +471,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } } int numUnavailableSegments = unavailableSegments.size(); - requestStatistics.setNumUnavailableSegments(numUnavailableSegments); + requestContext.setNumUnavailableSegments(numUnavailableSegments); List<ProcessingException> exceptions = new ArrayList<>(); if (numUnavailableSegments > 0) { @@ -530,7 +530,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } BrokerResponseNative brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, - realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestStatistics); + realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); brokerResponse.setExceptions(exceptions); long executionEndTimeNs = System.nanoTime(); _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_EXECUTION, @@ -544,10 +544,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Set total query processing time long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs); brokerResponse.setTimeUsedMs(totalTimeMs); - requestStatistics.setQueryProcessingTime(totalTimeMs); - augmentStatistics(requestStatistics, brokerResponse); + requestContext.setQueryProcessingTime(totalTimeMs); + augmentStatistics(requestContext, brokerResponse); - logBrokerResponse(requestId, query, requestStatistics, brokerRequest, numUnavailableSegments, serverStats, + logBrokerResponse(requestId, query, requestContext, brokerRequest, numUnavailableSegments, serverStats, brokerResponse, totalTimeMs); return brokerResponse; } @@ -627,7 +627,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { return TRUE.equals(brokerRequest.getPinotQuery().getFilterExpression()); } - private void logBrokerResponse(long requestId, String query, RequestStatistics requestStatistics, + private void logBrokerResponse(long requestId, String query, RequestContext requestContext, BrokerRequest brokerRequest, int numUnavailableSegments, ServerStats serverStats, BrokerResponseNative brokerResponse, long totalTimeMs) { LOGGER.debug("Broker Response: {}", brokerResponse); @@ -649,7 +649,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { brokerResponse.getNumConsumingSegmentsQueried(), numUnavailableSegments, brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(), - requestStatistics.getReduceTimeMillis(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), + requestContext.getReduceTimeMillis(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), brokerResponse.getOfflineTotalCpuTimeNs(), brokerResponse.getOfflineThreadCpuTimeNs(), brokerResponse.getOfflineSystemActivitiesCpuTimeNs(), brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(), @@ -687,11 +687,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * <p>Currently only supports subquery within the filter. */ private void handleSubquery(PinotQuery pinotQuery, long requestId, JsonNode jsonRequest, - @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics) + @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { Expression filterExpression = pinotQuery.getFilterExpression(); if (filterExpression != null) { - handleSubquery(filterExpression, requestId, jsonRequest, requesterIdentity, requestStatistics); + handleSubquery(filterExpression, requestId, jsonRequest, requesterIdentity, requestContext); } } @@ -703,7 +703,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * IN_ID_SET transform function. */ private void handleSubquery(Expression expression, long requestId, JsonNode jsonRequest, - @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics) + @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { Function function = expression.getFunctionCall(); if (function == null) { @@ -716,7 +716,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { Preconditions.checkState(subqueryLiteral != null, "Second argument of IN_SUBQUERY must be a literal (subquery)"); String subquery = subqueryLiteral.getStringValue(); BrokerResponseNative response = - handleRequest(requestId, subquery, jsonRequest, requesterIdentity, requestStatistics); + handleRequest(requestId, subquery, jsonRequest, requesterIdentity, requestContext); if (response.getExceptionsSize() != 0) { throw new RuntimeException("Caught exception while executing subquery: " + subquery); } @@ -725,7 +725,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { operands.set(1, RequestUtils.getLiteralExpression(serializedIdSet)); } else { for (Expression operand : operands) { - handleSubquery(operand, requestId, jsonRequest, requesterIdentity, requestStatistics); + handleSubquery(operand, requestId, jsonRequest, requesterIdentity, requestContext); } } } @@ -1097,7 +1097,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * Processes the literal only query. */ private BrokerResponseNative processLiteralOnlyQuery(PinotQuery pinotQuery, long compilationStartTimeNs, - RequestStatistics requestStatistics) { + RequestContext requestContext) { BrokerResponseNative brokerResponse = new BrokerResponseNative(); List<String> columnNames = new ArrayList<>(); List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(); @@ -1114,8 +1114,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - compilationStartTimeNs); brokerResponse.setTimeUsedMs(totalTimeMs); - requestStatistics.setQueryProcessingTime(totalTimeMs); - augmentStatistics(requestStatistics, brokerResponse); + requestContext.setQueryProcessingTime(totalTimeMs); + augmentStatistics(requestContext, brokerResponse); return brokerResponse; } @@ -1516,13 +1516,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * Processes the optimized broker requests for both OFFLINE and REALTIME table. */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestStatistics requestStatistics) + BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, + List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, + List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception; - private static void augmentStatistics(RequestStatistics statistics, BrokerResponse response) { + private static void augmentStatistics(RequestContext statistics, BrokerResponse response) { statistics.setTotalDocs(response.getTotalDocs()); statistics.setNumDocsScanned(response.getNumDocsScanned()); statistics.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 786aa8b324..93591dee71 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -23,7 +23,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.common.response.BrokerResponse; -import org.apache.pinot.spi.trace.RequestStatistics; +import org.apache.pinot.spi.trace.RequestContext; @ThreadSafe @@ -34,6 +34,6 @@ public interface BrokerRequestHandler { void shutDown(); BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, - RequestStatistics requestStatistics) + RequestContext requestContext) throws Exception; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index e2c7cf40b3..468add98c2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -41,7 +41,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.trace.RequestStatistics; +import org.apache.pinot.spi.trace.RequestContext; /** @@ -81,18 +81,20 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, - List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics) + List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { // TODO: Support failure detection assert offlineBrokerRequest != null || realtimeBrokerRequest != null; Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - sendRequest(TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap); + sendRequest(TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap, + requestContext.isSampledRequest()); } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - sendRequest(TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap); + sendRequest(TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap, + requestContext.isSampledRequest()); } return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); @@ -103,7 +105,7 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { */ private void sendRequest(TableType tableType, BrokerRequest brokerRequest, Map<ServerInstance, List<String>> routingTable, - Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap) { + Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) { for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); List<String> segments = routingEntry.getValue(); @@ -111,7 +113,8 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { int port = serverInstance.getGrpcPort(); // TODO: enable throttling on per host bases. Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port, - new GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true)); + new GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true) + .setEnableTrace(trace)); responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC), streamingResponse); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index f16c12f893..2e399211cb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -49,7 +49,8 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerResponse; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.trace.RequestStatistics; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,9 +96,12 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestStatistics requestStatistics) + RequestContext requestContext) throws Exception { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + if (requestContext.isSampledRequest()) { + serverBrokerRequest.getPinotQuery().putToQueryOptions(CommonConstants.Broker.Request.TRACE, "true"); + } String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName()); long scatterGatherStartTimeNs = System.nanoTime(); @@ -134,7 +138,7 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl _brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest, dataTableMap, reduceTimeOutMs, _brokerMetrics); final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs; - requestStatistics.setReduceTimeNanos(reduceTimeNanos); + requestContext.setReduceTimeNanos(reduceTimeNanos); _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos); brokerResponse.setNumServersQueried(numServersQueried); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java index bee32f9432..704d397c35 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java @@ -30,7 +30,7 @@ import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.trace.RequestStatistics; +import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.sql.parsers.CalciteSqlParser; @@ -129,7 +129,7 @@ public class LiteralOnlyBrokerRequestTest { RANDOM.nextBytes(randBytes); String ranStr = BytesUtils.toHexString(randBytes); JsonNode request = new ObjectMapper().readTree(String.format("{\"sql\":\"SELECT %d, '%s'\"}", randNum, ranStr)); - RequestStatistics requestStats = Tracing.getTracer().createRequestScope(); + RequestContext requestStats = Tracing.getTracer().createRequestScope(); BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats); Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), String.format("%d", randNum)); Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnDataType(0), @@ -154,7 +154,7 @@ public class LiteralOnlyBrokerRequestTest { long currentTsMin = System.currentTimeMillis(); JsonNode request = new ObjectMapper().readTree( "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}"); - RequestStatistics requestStats = Tracing.getTracer().createRequestScope(); + RequestContext requestStats = Tracing.getTracer().createRequestScope(); BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats); long currentTsMax = System.currentTimeMillis(); Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), "currentTs"); @@ -225,7 +225,7 @@ public class LiteralOnlyBrokerRequestTest { ObjectMapper objectMapper = new ObjectMapper(); // Test 1: select constant JsonNode request = objectMapper.readTree("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}"); - RequestStatistics requestStats = Tracing.getTracer().createRequestScope(); + RequestContext requestStats = Tracing.getTracer().createRequestScope(); BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats); checkExplainResultSchema(brokerResponse.getResultTable().getDataSchema(), diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java similarity index 98% rename from pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java index 649688b0f2..60e1a1cca8 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; * This object can be used to publish the query processing statistics to a stream for * post-processing at a finer level than metrics. */ -public class DefaultRequestStatistics implements RequestScope { +public class DefaultRequestContext implements RequestScope { private static final String DEFAULT_TABLE_NAME = "NotYetParsed"; @@ -64,7 +64,7 @@ public class DefaultRequestStatistics implements RequestScope { private FanoutType _fanoutType; private int _numUnavailableSegments; - public DefaultRequestStatistics() { + public DefaultRequestContext() { } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java similarity index 97% rename from pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java index 9f47d348b0..89f768608b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.spi.trace; -public interface RequestStatistics { +public interface RequestContext { long getOfflineSystemActivitiesCpuTimeNs(); void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs); @@ -51,6 +51,10 @@ public interface RequestStatistics { long getRequestId(); + default boolean isSampledRequest() { + return false; + } + long getRequestArrivalTimeMillis(); long getReduceTimeMillis(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java index 450ad20e23..3d1c7b052f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java @@ -22,5 +22,5 @@ package org.apache.pinot.spi.trace; * A scope wrapping an end to end synchronous pinot request. * Can be extended by a custom tracer to meter request latency. */ -public interface RequestScope extends Scope, RequestStatistics { +public interface RequestScope extends Scope, RequestContext { } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java index bc448d0461..bf07b6f828 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java @@ -48,7 +48,7 @@ public interface Tracer { * @return the request record */ default RequestScope createRequestScope() { - return new DefaultRequestStatistics(); + return new DefaultRequestContext(); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org