This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch move-brokerId-extraction-to-BaseBrokerStarter in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 892ce9baf841a4412663d760c3bf880255b91574 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Sun Dec 11 11:54:27 2022 -0800 Move brokerId extraction to BaseBrokerStarter --- .../broker/broker/helix/BaseBrokerStarter.java | 21 ++++++++++++++++----- .../requesthandler/BaseBrokerRequestHandler.java | 13 ++----------- .../BrokerRequestHandlerDelegate.java | 5 ++++- .../requesthandler/GrpcBrokerRequestHandler.java | 4 ++-- .../MultiStageBrokerRequestHandler.java | 11 ++++++----- .../SingleConnectionBrokerRequestHandler.java | 10 +++++----- .../BaseBrokerRequestHandlerTest.java | 2 +- .../LiteralOnlyBrokerRequestTest.java | 18 +++++++++--------- 8 files changed, 45 insertions(+), 39 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 694e6fde1c..ab35409881 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -20,6 +20,7 @@ package org.apache.pinot.broker.broker.helix; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -264,21 +265,22 @@ public abstract class BaseBrokerStarter implements ServiceStartable { NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX); // Create Broker request handler. + String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId()); String brokerRequestHandlerType = _brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE); BrokerRequestHandler singleStageBrokerRequestHandler = null; if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { singleStageBrokerRequestHandler = - new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, + new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics, null); } else { // default request handler type, e.g. netty if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) { singleStageBrokerRequestHandler = - new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, + new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager); } else { singleStageBrokerRequestHandler = - new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, + new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager); } } @@ -289,11 +291,11 @@ public abstract class BaseBrokerStarter implements ServiceStartable { // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport. // TODO: decouple protocol and engine selection. multiStageBrokerRequestHandler = - new MultiStageBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, + new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics); } - _brokerRequestHandler = new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, + _brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler, multiStageBrokerRequestHandler, _brokerMetrics); _brokerRequestHandler.start(); String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL); @@ -433,6 +435,15 @@ public abstract class BaseBrokerStarter implements ServiceStartable { new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown)))); } + private String getDefaultBrokerId() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + LOGGER.error("Caught exception while getting default broker Id", e); + return ""; + } + } + @Override public void stop() { LOGGER.info("Shutting down Pinot broker"); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index ae88689a51..ba2b0e8dac 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -129,9 +129,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private final boolean _enableDistinctCountBitmapOverride; private final Map<Long, QueryServers> _queriesById; - public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, + public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) { + _brokerId = brokerId; _config = config; _routingManager = routingManager; _accessControlFactory = accessControlFactory; @@ -146,7 +147,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _enableDistinctCountBitmapOverride = _config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false); - _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId()); _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryResponseLimit = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); @@ -160,15 +160,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation); } - private String getDefaultBrokerId() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (Exception e) { - LOGGER.error("Caught exception while getting default broker Id", e); - return ""; - } - } - @Override public Map<Long, String> getRunningQueries() { Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index c6eee5f04f..3e6a0598be 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -49,9 +49,11 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { private final BrokerRequestHandler _singleStageBrokerRequestHandler; private final BrokerRequestHandler _multiStageWorkerRequestHandler; private final BrokerMetrics _brokerMetrics; + private final String _brokerId; - public BrokerRequestHandlerDelegate(BrokerRequestHandler singleStageBrokerRequestHandler, + public BrokerRequestHandlerDelegate(String brokerId, BrokerRequestHandler singleStageBrokerRequestHandler, @Nullable BrokerRequestHandler multiStageWorkerRequestHandler, BrokerMetrics brokerMetrics) { + _brokerId = brokerId; _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler; _multiStageWorkerRequestHandler = multiStageWorkerRequestHandler; _brokerMetrics = brokerMetrics; @@ -81,6 +83,7 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { + requestContext.setBrokerId(_brokerId); if (sqlNodeAndOptions == null) { try { sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 2af8027668..95d17c955b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -59,10 +59,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { private final PinotStreamingQueryClient _streamingQueryClient; // TODO: Support TLS - public GrpcBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, + public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { - super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); LOGGER.info("Using Grpc BrokerRequestHandler."); _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 6cdd68cd80..ee83a745b3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -72,15 +72,16 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final QueryEnvironment _queryEnvironment; private final QueryDispatcher _queryDispatcher; - public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, - AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, - BrokerMetrics brokerMetrics) { - super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig, + BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, + QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) { + super(config, brokerIdFromConfig, routingManager, accessControlFactory, queryQuotaManager, tableCache, + brokerMetrics); LOGGER.info("Using Multi-stage BrokerRequestHandler."); String reducerHostname = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME); if (reducerHostname == null) { // use broker ID as host name, but remove the - String brokerId = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID); + String brokerId = brokerIdFromConfig; brokerId = brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? brokerId.substring( CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId; brokerId = StringUtils.split(brokerId, "_").length > 1 ? StringUtils.split(brokerId, "_")[0] : brokerId; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index c34b8dda6a..d78aa5ffe8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -69,11 +69,11 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl private final QueryRouter _queryRouter; private final FailureDetector _failureDetector; - public SingleConnectionBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, - AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, - BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig tlsConfig, - ServerRoutingStatsManager serverRoutingStatsManager) { - super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, + QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig, + TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); LOGGER.info("Using Netty BrokerRequestHandler."); _brokerReduceService = new BrokerReduceService(_config); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java index 88f72600e5..d2ea5c9b7a 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java @@ -206,7 +206,7 @@ public class BaseBrokerRequestHandlerTest { PinotConfiguration config = new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true")); BaseBrokerRequestHandler requestHandler = - new BaseBrokerRequestHandler(config, routingManager, new AllowAllAccessControlFactory(), + new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(), queryQuotaManager, tableCache, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) { @Override diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java index 389db74fb6..a9ee2a5dee 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java @@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest { public void testBrokerRequestHandler() throws Exception { SingleConnectionBrokerRequestHandler requestHandler = - new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null, - new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), - null, null, mock(ServerRoutingStatsManager.class)); + new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null, + null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null, + null, mock(ServerRoutingStatsManager.class)); long randNum = RANDOM.nextLong(); byte[] randBytes = new byte[12]; @@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest { public void testBrokerRequestHandlerWithAsFunction() throws Exception { SingleConnectionBrokerRequestHandler requestHandler = - new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null, - new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), - null, null, mock(ServerRoutingStatsManager.class)); + new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null, + null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null, + null, mock(ServerRoutingStatsManager.class)); long currentTsMin = System.currentTimeMillis(); JsonNode request = JsonUtils.stringToJsonNode( "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}"); @@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest { public void testExplainPlanLiteralOnly() throws Exception { SingleConnectionBrokerRequestHandler requestHandler = - new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null, - new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), - null, null, mock(ServerRoutingStatsManager.class)); + new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null, + null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null, + null, mock(ServerRoutingStatsManager.class)); // Test 1: select constant JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org