This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ce7524c8b0 Improve FailureDetector logic in QueryDispatcher for MSQE (#15084) ce7524c8b0 is described below commit ce7524c8b073904e4c054b9c9ccd43ab7152313f Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Wed Feb 19 07:51:19 2025 +0530 Improve FailureDetector logic in QueryDispatcher for MSQE (#15084) --- .../pinot/broker/broker/helix/BaseBrokerStarter.java | 2 +- .../MultiStageBrokerRequestHandler.java | 2 +- .../pinot/query/service/dispatch/QueryDispatcher.java | 19 +++++++++++-------- .../query/service/dispatch/QueryDispatcherTest.java | 3 ++- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 35e10361fc..cc68a93945 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -501,7 +501,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { String hostname = _brokerConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); int port = Integer.parseInt(_brokerConf.getProperty( CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT)); - return new QueryDispatcher(new MailboxService(hostname, port, _brokerConf)); + return new QueryDispatcher(new MailboxService(hostname, port, _brokerConf), _failureDetector); } private void updateInstanceConfigAndBrokerResourceIfNeeded() { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index cbbdf8cb19..d17361926d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -122,7 +122,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer); _queryDispatcher = - new QueryDispatcher(new MailboxService(hostname, port, config, tlsConfig), tlsConfig, failureDetector, + new QueryDispatcher(new MailboxService(hostname, port, config, tlsConfig), failureDetector, tlsConfig, this.isQueryCancellationEnabled()); LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, " + "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 06e230dce9..de347e7c07 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -112,15 +112,14 @@ public class QueryDispatcher { private final Map<Long, Set<QueryServerInstance>> _serversByQuery; private final PhysicalTimeSeriesBrokerPlanVisitor _timeSeriesBrokerPlanVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); - @Nullable private final FailureDetector _failureDetector; - public QueryDispatcher(MailboxService mailboxService) { - this(mailboxService, null, null, false); + public QueryDispatcher(MailboxService mailboxService, FailureDetector failureDetector) { + this(mailboxService, failureDetector, null, false); } - public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig tlsConfig, - @Nullable FailureDetector failureDetector, boolean enableCancellation) { + public QueryDispatcher(MailboxService mailboxService, FailureDetector failureDetector, @Nullable TlsConfig tlsConfig, + boolean enableCancellation) { _mailboxService = mailboxService; _executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), new TracedThreadFactory(Thread.NORM_PRIORITY, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT)); @@ -271,9 +270,7 @@ public class QueryDispatcher { } catch (Throwable t) { LOGGER.warn("Caught exception while dispatching query: {} to server: {}", requestId, serverInstance, t); callbackConsumer.accept(new AsyncResponse<>(serverInstance, null, t)); - if (_failureDetector != null) { - _failureDetector.markServerUnhealthy(serverInstance.getInstanceId()); - } + _failureDetector.markServerUnhealthy(serverInstance.getInstanceId()); } } @@ -284,6 +281,12 @@ public class QueryDispatcher { dispatchCallbacks.poll(deadline.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); if (resp != null) { if (resp.getThrowable() != null) { + // If it's a connectivity issue between the broker and the server, mark the server as unhealthy to prevent + // subsequent query failures + if (getOrCreateDispatchClient(resp.getServerInstance()).getChannel().getState(false) + != ConnectivityState.READY) { + _failureDetector.markServerUnhealthy(resp.getServerInstance().getInstanceId()); + } throw new RuntimeException( String.format("Error dispatching query: %d to server: %s", requestId, resp.getServerInstance()), resp.getThrowable()); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index 6f64791ec6..c33338f1ff 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import org.apache.pinot.common.failuredetector.FailureDetector; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestBase; @@ -72,7 +73,7 @@ public class QueryDispatcherTest extends QueryTestSet { _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, portList.get(0), portList.get(1), QueryEnvironmentTestBase.TABLE_SCHEMAS, QueryEnvironmentTestBase.SERVER1_SEGMENTS, QueryEnvironmentTestBase.SERVER2_SEGMENTS, null); - _queryDispatcher = new QueryDispatcher(Mockito.mock(MailboxService.class)); + _queryDispatcher = new QueryDispatcher(Mockito.mock(MailboxService.class), Mockito.mock(FailureDetector.class)); } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org