This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a86ba9c42f Add brokerId and brokerReduceTimeMs to the broker response stats (#11142) a86ba9c42f is described below commit a86ba9c42fa94703988a477620966eb5f31f6d89 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Wed Jul 26 23:54:01 2023 +0530 Add brokerId and brokerReduceTimeMs to the broker response stats (#11142) --- .../requesthandler/BaseBrokerRequestHandler.java | 2 ++ .../requesthandler/GrpcBrokerRequestHandler.java | 7 ++-- .../MultiStageBrokerRequestHandler.java | 32 +++++++++-------- .../org/apache/pinot/client/BrokerResponse.java | 6 ++++ .../org/apache/pinot/client/ExecutionStats.java | 6 ++++ .../pinot/common/response/BrokerResponse.java | 14 ++++++++ .../response/broker/BrokerResponseNative.java | 41 +++++++++++++++++----- .../response/broker/BrokerResponseStats.java | 16 ++++----- .../query/service/dispatch/QueryDispatcher.java | 12 ++++--- .../service/dispatch/QueryDispatcherTest.java | 10 ++++-- 10 files changed, 108 insertions(+), 38 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 5921435aab..4a8ed8f3e7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -265,6 +265,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } brokerResponse.setRequestId(String.valueOf(requestId)); + brokerResponse.setBrokerId(_brokerId); + brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis()); return brokerResponse; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 95d17c955b..56be2ea4fb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -103,8 +103,11 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap, requestContext.isSampledRequest()); } - return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, - _brokerMetrics); + final long startReduceTimeNanos = System.nanoTime(); + BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, + responseMap, timeoutMs, _brokerMetrics); + requestContext.setReduceTimeNanos(System.nanoTime() - startReduceTimeNanos); + return brokerResponse; } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 27e48e2ed7..5e030919fa 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -145,12 +145,18 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } String query = sql.asText(); requestContext.setQuery(query); - return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext); + BrokerResponse brokerResponse = handleRequest(sqlNodeAndOptions, request, requesterIdentity, requestContext); + + brokerResponse.setRequestId(String.valueOf(requestId)); + brokerResponse.setBrokerId(_brokerId); + brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis()); + return brokerResponse; } - private BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, - JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) - throws Exception { + private BrokerResponse handleRequest(@Nullable SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) { + final String query = requestContext.getQuery(); + final long requestId = requestContext.getRequestId(); LOGGER.debug("SQL query for request {}: {}", requestId, query); long compilationStartTimeNs; @@ -168,7 +174,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId); String plan = queryPlanResult.getExplainPlan(); Set<String> tableNames = queryPlanResult.getTableNames(); - if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) { + if (!hasTableAccess(requesterIdentity, tableNames, requestContext)) { throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN); } @@ -194,13 +200,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { updatePhaseTimingForTables(tableNames, BrokerQueryPhase.REQUEST_COMPILATION, compilationTimeNs); // Validate table access. - if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) { + if (!hasTableAccess(requesterIdentity, tableNames, requestContext)) { throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN); } updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs); // Validate QPS quota - if (hasExceededQPSQuota(tableNames, requestId, requestContext)) { + if (hasExceededQPSQuota(tableNames, requestContext)) { String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query); return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } @@ -217,7 +223,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { long executionStartTimeNs = System.nanoTime(); try { - queryResults = _queryDispatcher.submitAndReduce(requestId, dispatchableSubPlan, _mailboxService, + queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, _mailboxService, _reducerScheduler, queryTimeoutMs, sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled); } catch (Throwable t) { @@ -234,7 +240,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs)); brokerResponse.setTimeUsedMs(totalTimeMs); brokerResponse.setResultTable(queryResults); - brokerResponse.setRequestId(String.valueOf(requestId)); for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) { if (entry.getKey() == 0) { @@ -263,26 +268,25 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { /** * Validates whether the requester has access to all the tables. */ - private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, long requestId, + private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, RequestContext requestContext) { boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity, tableNames); if (!hasAccess) { _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); - LOGGER.warn("Access denied for requestId {}", requestId); + LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId()); requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); return false; } - return true; } /** * Returns true if the QPS quota of the tables has exceeded. */ - private boolean hasExceededQPSQuota(Set<String> tableNames, long requestId, RequestContext requestContext) { + private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext requestContext) { for (String tableName : tableNames) { if (!_queryQuotaManager.acquire(tableName)) { - LOGGER.warn("Request {}: query exceeds quota for table: {}", requestId, tableName); + LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); String rawTableName = TableNameBuilder.extractRawTableName(tableName); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java index c8cd0b8ce3..6c04097b5c 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JsonNode; */ public class BrokerResponse { private String _requestId; + private String _brokerId; private JsonNode _aggregationResults; private JsonNode _selectionResults; private JsonNode _resultTable; @@ -37,6 +38,7 @@ public class BrokerResponse { private BrokerResponse(JsonNode brokerResponse) { _requestId = brokerResponse.get("requestId") != null ? brokerResponse.get("requestId").asText() : "unknown"; + _brokerId = brokerResponse.get("brokerId") != null ? brokerResponse.get("brokerId").asText() : "unknown"; _aggregationResults = brokerResponse.get("aggregationResults"); _exceptions = brokerResponse.get("exceptions"); _selectionResults = brokerResponse.get("selectionResults"); @@ -87,4 +89,8 @@ public class BrokerResponse { public String getRequestId() { return _requestId; } + + public String getBrokerId() { + return _brokerId; + } } diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java index c66ca39b98..dc6bfd6b03 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java @@ -44,6 +44,7 @@ public class ExecutionStats { private static final String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs"; private static final String TOTAL_DOCS = "totalDocs"; private static final String NUM_GROUPS_LIMIT_REACHED = "numGroupsLimitReached"; + private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs"; private static final String TIME_USED_MS = "timeUsedMs"; private final JsonNode _brokerResponse; @@ -113,6 +114,10 @@ public class ExecutionStats { return _brokerResponse.has(TIME_USED_MS) ? _brokerResponse.get(TIME_USED_MS).asLong() : -1L; } + public long getBrokerReduceTimeMs() { + return _brokerResponse.has(BROKER_REDUCE_TIME_MS) ? _brokerResponse.get(BROKER_REDUCE_TIME_MS).asLong() : -1L; + } + @Override public String toString() { Map<String, Object> map = new HashMap<>(); @@ -128,6 +133,7 @@ public class ExecutionStats { map.put(MIN_CONSUMING_FRESHNESS_TIME_MS, getMinConsumingFreshnessTimeMs() + "ms"); map.put(TOTAL_DOCS, getTotalDocs()); map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached()); + map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms"); map.put(TIME_USED_MS, getTimeUsedMs() + "ms"); return map.toString(); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index 3ad49460bc..89fcc8c04b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -335,4 +335,18 @@ public interface BrokerResponse { * set request ID generated by broker */ void setRequestId(String requestId); + + /** + * get broker ID of the processing broker + */ + String getBrokerId(); + + /** + * set broker ID of the processing broker + */ + void setBrokerId(String requestId); + + long getBrokerReduceTimeMs(); + + void setBrokerReduceTimeMs(long brokerReduceTimeMs); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index f2580b9cc3..3e9345c25f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -41,14 +41,13 @@ import org.apache.pinot.spi.utils.JsonUtils; * Supports serialization via JSON. */ @JsonPropertyOrder({ - "resultTable", "requestId", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", - "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", - "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", - "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", - "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", - "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics", - "traceInfo" -}) + "resultTable", "requestId", "brokerId", "exceptions", "numServersQueried", "numServersResponded", + "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", + "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", + "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", + "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", + "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", + "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo"}) public class BrokerResponseNative implements BrokerResponse { public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty(); public static final BrokerResponseNative NO_TABLE_RESULT = @@ -58,6 +57,7 @@ public class BrokerResponseNative implements BrokerResponse { public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT = getBrokerResponseExplainPlanOutput(); private String _requestId; + private String _brokerId; private int _numServersQueried = 0; private int _numServersResponded = 0; private long _numDocsScanned = 0L; @@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse { // the timestamp indicating the freshness of the data queried in consuming segments. // This can be ingestion timestamp if provided by the stream, or the last index time private long _minConsumingFreshnessTimeMs = 0L; + private long _brokerReduceTimeMs = 0L; private long _totalDocs = 0L; private boolean _numGroupsLimitReached = false; @@ -570,4 +571,28 @@ public class BrokerResponseNative implements BrokerResponse { public void setRequestId(String requestId) { _requestId = requestId; } + + @JsonProperty("brokerId") + @Override + public String getBrokerId() { + return _brokerId; + } + + @JsonProperty("brokerId") + @Override + public void setBrokerId(String requestId) { + _brokerId = requestId; + } + + @JsonProperty("brokerReduceTimeMs") + @Override + public long getBrokerReduceTimeMs() { + return _brokerReduceTimeMs; + } + + @JsonProperty("brokerReduceTimeMs") + @Override + public void setBrokerReduceTimeMs(long brokerReduceTimeMs) { + _brokerReduceTimeMs = brokerReduceTimeMs; + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java index f2361a7908..db23034efc 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java @@ -33,14 +33,14 @@ import org.apache.pinot.spi.utils.JsonUtils; // same metadataKey // TODO: Replace member fields with a simple map of <MetadataKey, Object> // TODO: Add a subStat field, stage level subStats will contain each operator stats -@JsonPropertyOrder({"requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit", - "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried", - "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", - "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", - "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", - "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", - "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", - "traceInfo", "operatorStats", "tableNames"}) +@JsonPropertyOrder({"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", + "stageExecutionUnit", "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", + "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", + "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", + "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", + "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", + "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", + "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"}) @JsonInclude(JsonInclude.Include.NON_DEFAULT) public class BrokerResponseStats extends BrokerResponseNative { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index e5ca95cbbe..9eff0bc6c4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -64,6 +64,7 @@ import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.service.QueryConfig; +import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.ByteArray; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; @@ -86,18 +87,21 @@ public class QueryDispatcher { new TracedThreadFactory(Thread.NORM_PRIORITY, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT)); } - public ResultTable submitAndReduce(long requestId, DispatchableSubPlan dispatchableSubPlan, + public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, MailboxService mailboxService, OpChainSchedulerService scheduler, long timeoutMs, Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator, boolean traceEnabled) throws Exception { + final long requestId = context.getRequestId(); try { // submit all the distributed stages. int reduceStageId = submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions); // run reduce stage and return result. - return runReducer(requestId, dispatchableSubPlan, reduceStageId, timeoutMs, mailboxService, scheduler, - executionStatsAggregator, - traceEnabled); + long reduceStartTimeInNanos = System.nanoTime(); + ResultTable resultTable = runReducer(requestId, dispatchableSubPlan, reduceStageId, timeoutMs, mailboxService, + scheduler, executionStatsAggregator, traceEnabled); + context.setReduceTimeNanos(System.nanoTime() - reduceStartTimeInNanos); + return resultTable; } catch (Exception e) { cancel(requestId, dispatchableSubPlan); throw new RuntimeException("Error executing query: " diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index 3cd3c839fb..ab85a15d60 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -38,6 +38,8 @@ import org.apache.pinot.query.planner.PlannerUtils; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.service.QueryServer; import org.apache.pinot.query.testutils.QueryTestUtils; +import org.apache.pinot.spi.trace.DefaultRequestContext; +import org.apache.pinot.spi.trace.RequestContext; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -138,8 +140,10 @@ public class QueryDispatcherTest extends QueryTestSet { DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); QueryDispatcher dispatcher = new QueryDispatcher(); long requestId = RANDOM_REQUEST_ID_GEN.nextLong(); + RequestContext context = new DefaultRequestContext(); + context.setRequestId(requestId); try { - dispatcher.submitAndReduce(requestId, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false); + dispatcher.submitAndReduce(context, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false); Assert.fail("Method call above should have failed"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Error executing query")); @@ -160,9 +164,11 @@ public class QueryDispatcherTest extends QueryTestSet { DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); QueryDispatcher dispatcher = new QueryDispatcher(); long requestId = RANDOM_REQUEST_ID_GEN.nextLong(); + RequestContext context = new DefaultRequestContext(); + context.setRequestId(requestId); try { // will throw b/c mailboxService is null - dispatcher.submitAndReduce(requestId, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false); + dispatcher.submitAndReduce(context, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false); Assert.fail("Method call above should have failed"); } catch (Exception e) { System.out.println("e = " + e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org