This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d6862727c2 [multistage] Avoid Broker Request Id Collision (#10789) d6862727c2 is described below commit d6862727c2352c939ea55e14f834e73d8b39cb95 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Wed May 24 02:50:03 2023 +0530 [multistage] Avoid Broker Request Id Collision (#10789) * [multistage] Use multistage requestID generator * Add UT for negative check * Make requestId more readable --- .../MultiStageBrokerRequestHandler.java | 36 +++++++++++++++++++++- .../MultiStageBrokerRequestHandlerTest.java | 24 ++++++++++----- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index d4ece61fa6..85839ebe66 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.calcite.jdbc.CalciteSchemaBuilder; import org.apache.commons.lang3.StringUtils; @@ -76,6 +77,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final MailboxService _mailboxService; private final QueryEnvironment _queryEnvironment; private final QueryDispatcher _queryDispatcher; + private final MultiStageRequestIdGenerator _multistageRequestIdGenerator; public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, @@ -109,13 +111,15 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { // TODO: move this to a startUp() function. _mailboxService.start(); + + _multistageRequestIdGenerator = new MultiStageRequestIdGenerator(brokerIdFromConfig); } @Override public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { - long requestId = _requestIdGenerator.incrementAndGet(); + long requestId = _multistageRequestIdGenerator.get(); requestContext.setRequestId(requestId); requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis()); @@ -318,4 +322,34 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { _queryDispatcher.shutdown(); _mailboxService.shutdown(); } + + /** + * OpChains in Multistage queries are identified by the requestId and the stage-id. v1 Engine uses an incrementing + * long to generate requestId, so the requestIds are numbered [0, 1, 2, ...]. When running with multiple brokers, + * it could be that two brokers end up generating the same requestId which could lead to weird query errors. This + * requestId generator addresses that by: + * <ol> + * <li> + * Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same + * requestId. This mask becomes the most significant 9 digits (in base-10). + * </li> + * <li> + * Using a auto-incrementing counter for the least significant 9 digits (in base-10). + * </li> + * </ol> + */ + static class MultiStageRequestIdGenerator { + private static final long OFFSET = 1_000_000_000L; + private final long _mask; + private final AtomicLong _incrementingId = new AtomicLong(0); + + public MultiStageRequestIdGenerator(String brokerId) { + _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET; + } + + public long get() { + long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET; + return _mask + normalized; + } + } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java index c8ebfa0266..236cb2cdde 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java @@ -20,6 +20,8 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.List; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; @@ -76,13 +78,21 @@ public class MultiStageBrokerRequestHandlerTest { JsonNode jsonRequest = objectMapper.readTree(sampleJsonRequest); RequestContext requestContext = new DefaultRequestContext(); - _requestHandler.handleRequest(jsonRequest, null, null, requestContext); - long expectedRequestId = 1L; - Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly"); - - _requestHandler.handleRequest(jsonRequest, null, null, requestContext); - expectedRequestId += 1L; - Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly"); + List<Long> requestIds = new ArrayList<>(); + // Request id should be unique each time, and there should be a difference of 1 between consecutive requestIds. + for (int iteration = 0; iteration < 10; iteration++) { + _requestHandler.handleRequest(jsonRequest, null, null, requestContext); + Assert.assertTrue(requestContext.getRequestId() >= 0, "Request ID should be non-negative"); + requestIds.add(requestContext.getRequestId()); + if (iteration != 0) { + Assert.assertEquals(1, requestIds.get(iteration) - requestIds.get(iteration - 1), + "Request Id should have difference of 1"); + } + } + Assert.assertEquals(10, requestIds.stream().distinct().count(), "Request Id should be unique"); + Assert.assertEquals(1, requestIds.stream().map(x -> (x >> 32)).distinct().count(), + "Request Id should have a broker-id specific mask for the 32 MSB"); + Assert.assertTrue(requestIds.stream().noneMatch(x -> x < 0), "Request Id should not be negative"); } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org