This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ce7524c8b0 Improve FailureDetector logic in QueryDispatcher for MSQE 
(#15084)
ce7524c8b0 is described below

commit ce7524c8b073904e4c054b9c9ccd43ab7152313f
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Wed Feb 19 07:51:19 2025 +0530

    Improve FailureDetector logic in QueryDispatcher for MSQE (#15084)
---
 .../pinot/broker/broker/helix/BaseBrokerStarter.java  |  2 +-
 .../MultiStageBrokerRequestHandler.java               |  2 +-
 .../pinot/query/service/dispatch/QueryDispatcher.java | 19 +++++++++++--------
 .../query/service/dispatch/QueryDispatcherTest.java   |  3 ++-
 4 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 35e10361fc..cc68a93945 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -501,7 +501,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     String hostname = 
_brokerConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     int port = Integer.parseInt(_brokerConf.getProperty(
         CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
-    return new QueryDispatcher(new MailboxService(hostname, port, 
_brokerConf));
+    return new QueryDispatcher(new MailboxService(hostname, port, 
_brokerConf), _failureDetector);
   }
 
   private void updateInstanceConfigAndBrokerResourceIfNeeded() {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index cbbdf8cb19..d17361926d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -122,7 +122,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
     _queryDispatcher =
-        new QueryDispatcher(new MailboxService(hostname, port, config, 
tlsConfig), tlsConfig, failureDetector,
+        new QueryDispatcher(new MailboxService(hostname, port, config, 
tlsConfig), failureDetector, tlsConfig,
             this.isQueryCancellationEnabled());
     LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: 
{} with broker id: {}, timeout: {}ms, "
             + "query log max length: {}, query log max rate: {}, query 
cancellation enabled: {}", hostname, port,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 06e230dce9..de347e7c07 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -112,15 +112,14 @@ public class QueryDispatcher {
   private final Map<Long, Set<QueryServerInstance>> _serversByQuery;
   private final PhysicalTimeSeriesBrokerPlanVisitor 
_timeSeriesBrokerPlanVisitor
       = new PhysicalTimeSeriesBrokerPlanVisitor();
-  @Nullable
   private final FailureDetector _failureDetector;
 
-  public QueryDispatcher(MailboxService mailboxService) {
-    this(mailboxService, null, null, false);
+  public QueryDispatcher(MailboxService mailboxService, FailureDetector 
failureDetector) {
+    this(mailboxService, failureDetector, null, false);
   }
 
-  public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig 
tlsConfig,
-      @Nullable FailureDetector failureDetector, boolean enableCancellation) {
+  public QueryDispatcher(MailboxService mailboxService, FailureDetector 
failureDetector, @Nullable TlsConfig tlsConfig,
+      boolean enableCancellation) {
     _mailboxService = mailboxService;
     _executorService = Executors.newFixedThreadPool(2 * 
Runtime.getRuntime().availableProcessors(),
         new TracedThreadFactory(Thread.NORM_PRIORITY, false, 
PINOT_BROKER_QUERY_DISPATCHER_FORMAT));
@@ -271,9 +270,7 @@ public class QueryDispatcher {
       } catch (Throwable t) {
         LOGGER.warn("Caught exception while dispatching query: {} to server: 
{}", requestId, serverInstance, t);
         callbackConsumer.accept(new AsyncResponse<>(serverInstance, null, t));
-        if (_failureDetector != null) {
-          _failureDetector.markServerUnhealthy(serverInstance.getInstanceId());
-        }
+        _failureDetector.markServerUnhealthy(serverInstance.getInstanceId());
       }
     }
 
@@ -284,6 +281,12 @@ public class QueryDispatcher {
           
dispatchCallbacks.poll(deadline.timeRemaining(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
       if (resp != null) {
         if (resp.getThrowable() != null) {
+          // If it's a connectivity issue between the broker and the server, 
mark the server as unhealthy to prevent
+          // subsequent query failures
+          if 
(getOrCreateDispatchClient(resp.getServerInstance()).getChannel().getState(false)
+              != ConnectivityState.READY) {
+            
_failureDetector.markServerUnhealthy(resp.getServerInstance().getInstanceId());
+          }
           throw new RuntimeException(
               String.format("Error dispatching query: %d to server: %s", 
requestId, resp.getServerInstance()),
               resp.getThrowable());
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 6f64791ec6..c33338f1ff 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.failuredetector.FailureDetector;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
@@ -72,7 +73,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, 
portList.get(0), portList.get(1),
         QueryEnvironmentTestBase.TABLE_SCHEMAS, 
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
         QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
-    _queryDispatcher = new QueryDispatcher(Mockito.mock(MailboxService.class));
+    _queryDispatcher = new QueryDispatcher(Mockito.mock(MailboxService.class), 
Mockito.mock(FailureDetector.class));
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to