This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4f5030530f Add request id to the V2 broker response (#10706) 4f5030530f is described below commit 4f5030530f55e9a08ad9e3cdbf8a5d96319bbb57 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed May 10 11:34:36 2023 +0530 Add request id to the V2 broker response (#10706) * Add request id to the V2 broker response * Add unit test --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../MultiStageBrokerRequestHandler.java | 1 + .../MultiStageBrokerRequestHandlerTest.java | 92 ++++++++++++++++++++++ .../response/broker/BrokerResponseNativeV2.java | 25 ++++-- 3 files changed, 111 insertions(+), 7 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 860592392f..ffc72c0732 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -223,6 +223,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs)); brokerResponse.setTimeUsedMs(totalTimeMs); brokerResponse.setResultTable(queryResults); + brokerResponse.setRequestId(String.valueOf(requestId)); for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) { if (entry.getKey() == 0) { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java new file mode 100644 index 0000000000..c8ebfa0266 --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.query.service.QueryConfig; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.DefaultRequestContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MultiStageBrokerRequestHandlerTest { + + private PinotConfiguration _config; + @Mock + private BrokerRoutingManager _routingManager; + + private AccessControlFactory _accessControlFactory; + @Mock + private QueryQuotaManager _queryQuotaManager; + @Mock + private TableCache _tableCache; + + @Mock + private BrokerMetrics _brokerMetrics; + + private MultiStageBrokerRequestHandler _requestHandler; + + @BeforeClass + public void setUp() { + MockitoAnnotations.openMocks(this); + _config = new PinotConfiguration(); + _config.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, "10000"); + _config.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, "12345"); + _accessControlFactory = new AllowAllAccessControlFactory(); + _requestHandler = + new MultiStageBrokerRequestHandler(_config, "testBrokerId", _routingManager, _accessControlFactory, + _queryQuotaManager, _tableCache, _brokerMetrics); + } + + @Test + public void testSetRequestId() + throws Exception { + String sampleSqlQuery = "SELECT * FROM testTable"; + String sampleJsonRequest = String.format("{\"sql\":\"%s\"}", sampleSqlQuery); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonRequest = objectMapper.readTree(sampleJsonRequest); + RequestContext requestContext = new DefaultRequestContext(); + + _requestHandler.handleRequest(jsonRequest, null, null, requestContext); + long expectedRequestId = 1L; + Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly"); + + _requestHandler.handleRequest(jsonRequest, null, null, requestContext); + expectedRequestId += 1L; + Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly"); + } + + @AfterClass + public void tearDown() { + _requestHandler.shutDown(); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 2eacbc23aa..8465d47f81 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -37,15 +37,17 @@ import org.apache.pinot.spi.utils.JsonUtils; * Supports serialization via JSON. */ @JsonPropertyOrder({ - "resultTable", "stageStats", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", - "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", - "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", - "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", - "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", - "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics", - "traceInfo" + "resultTable", "requestId", "stageStats", "exceptions", "numServersQueried", "numServersResponded", + "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", + "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", + "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", + "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", + "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", + "realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo" }) public class BrokerResponseNativeV2 extends BrokerResponseNative { + private String _requestId; + private final Map<Integer, BrokerResponseStats> _stageIdStats = new HashMap<>(); public BrokerResponseNativeV2() { @@ -91,4 +93,13 @@ public class BrokerResponseNativeV2 extends BrokerResponseNative { public Map<Integer, BrokerResponseStats> getStageIdStats() { return _stageIdStats; } + + @JsonProperty("requestId") + public String getRequestId() { + return _requestId; + } + + public void setRequestId(String requestId) { + _requestId = requestId; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org