This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e307106a59 AdaptiveServerSelection: update response stats for servers 
that have not responded (#9801)
e307106a59 is described below

commit e307106a59646a8e1d9475baabe66fa3cde09490
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Mon Nov 14 23:57:58 2022 -0800

    AdaptiveServerSelection: update response stats for servers that have not 
responded (#9801)
---
 .../pinot/core/transport/AsyncQueryResponse.java   | 26 +++++++++++-
 .../apache/pinot/core/transport/QueryRouter.java   | 13 +-----
 .../pinot/core/transport/QueryRoutingTest.java     | 48 +++++++++++++++++++++-
 3 files changed, 72 insertions(+), 15 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 7924a9afdb..3aecb184be 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.datatable.DataTable;
+import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 
 
 /**
@@ -43,12 +44,13 @@ public class AsyncQueryResponse implements QueryResponse {
   private final CountDownLatch _countDownLatch;
   private final long _maxEndTimeMs;
   private final long _timeoutMs;
+  private final ServerRoutingStatsManager _serverRoutingStatsManager;
 
   private volatile ServerRoutingInstance _failedServer;
   private volatile Exception _exception;
 
   public AsyncQueryResponse(QueryRouter queryRouter, long requestId, 
Set<ServerRoutingInstance> serversQueried,
-      long startTimeMs, long timeoutMs) {
+      long startTimeMs, long timeoutMs, ServerRoutingStatsManager 
serverRoutingStatsManager) {
     _queryRouter = queryRouter;
     _requestId = requestId;
     int numServersQueried = serversQueried.size();
@@ -59,6 +61,7 @@ public class AsyncQueryResponse implements QueryResponse {
     _countDownLatch = new CountDownLatch(numServersQueried);
     _timeoutMs = timeoutMs;
     _maxEndTimeMs = startTimeMs + timeoutMs;
+    _serverRoutingStatsManager = serverRoutingStatsManager;
   }
 
   @Override
@@ -84,6 +87,17 @@ public class AsyncQueryResponse implements QueryResponse {
       _status.compareAndSet(Status.IN_PROGRESS, finish ? Status.COMPLETED : 
Status.TIMED_OUT);
       return _responseMap;
     } finally {
+      // Update ServerRoutingStats.
+      for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : 
_responseMap.entrySet()) {
+        ServerResponse response = entry.getValue();
+        if (response == null || response.getDataTable() == null) {
+          // These are servers from which a response was not received. So 
update query response stats for such
+          // servers with maximum latency i.e timeout value.
+          
_serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, 
entry.getKey().getInstanceId(),
+              _timeoutMs);
+        }
+      }
+
       _queryRouter.markQueryDone(_requestId);
     }
   }
@@ -134,7 +148,15 @@ public class AsyncQueryResponse implements QueryResponse {
 
   void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable 
dataTable, int responseSize,
       int deserializationTimeMs) {
-    _responseMap.get(serverRoutingInstance).receiveDataTable(dataTable, 
responseSize, deserializationTimeMs);
+    ServerResponse response = _responseMap.get(serverRoutingInstance);
+    response.receiveDataTable(dataTable, responseSize, deserializationTimeMs);
+
+    // Record query completion stats immediately after receiving the response 
from the server instead of waiting
+    // for all servers to respond. This helps to keep the stats up-to-date.
+    long latencyMs = response.getResponseDelayMs();
+    _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, 
serverRoutingInstance.getInstanceId(),
+        latencyMs);
+
     _numServersResponded.getAndIncrement();
     _countDownLatch.countDown();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 7a13035eae..e0ff080e11 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -116,7 +116,8 @@ public class QueryRouter {
 
     // Create the asynchronous query response with the request map
     AsyncQueryResponse asyncQueryResponse =
-        new AsyncQueryResponse(this, requestId, requestMap.keySet(), 
System.currentTimeMillis(), timeoutMs);
+        new AsyncQueryResponse(this, requestId, requestMap.keySet(), 
System.currentTimeMillis(), timeoutMs,
+            _serverRoutingStatsManager);
     _asyncQueryResponseMap.put(requestId, asyncQueryResponse);
     for (Map.Entry<ServerRoutingInstance, InstanceRequest> entry : 
requestMap.entrySet()) {
       ServerRoutingInstance serverRoutingInstance = entry.getKey();
@@ -149,8 +150,6 @@ public class QueryRouter {
       AsyncQueryResponse asyncQueryResponse, Exception e) {
     LOGGER.error("Caught exception while sending request {} to server: {}, 
marking query failed", requestId,
         serverRoutingInstance, e);
-    _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, 
serverRoutingInstance.getInstanceId(),
-        (int) asyncQueryResponse.getTimeoutMs());
     asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
   }
 
@@ -183,20 +182,12 @@ public class QueryRouter {
     // Query future might be null if the query is already done (maybe due to 
failure)
     if (asyncQueryResponse != null) {
       asyncQueryResponse.receiveDataTable(serverRoutingInstance, dataTable, 
responseSize, deserializationTimeMs);
-
-      // Record query completion stats immediately after receiving the 
response from the server instead of waiting
-      // for the reduce phase.
-      long latencyMs = 
asyncQueryResponse.getServerResponseDelayMs(serverRoutingInstance);
-      _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, 
serverRoutingInstance.getInstanceId(),
-          latencyMs);
     }
   }
 
   void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception 
exception) {
     for (AsyncQueryResponse asyncQueryResponse : 
_asyncQueryResponseMap.values()) {
       asyncQueryResponse.markServerDown(serverRoutingInstance, exception);
-      
_serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(),
-          serverRoutingInstance.getInstanceId(), (int) 
asyncQueryResponse.getTimeoutMs());
     }
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 9fdaf7fb7d..5e1fa04176 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.transport;
 
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datatable.DataTable;
@@ -33,7 +34,9 @@ import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -60,10 +63,18 @@ public class QueryRoutingTest {
       Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
 
   private QueryRouter _queryRouter;
+  private ServerRoutingStatsManager _serverRoutingStatsManager;
+  int _requestCount;
 
   @BeforeClass
   public void setUp() {
-    _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), 
mock(ServerRoutingStatsManager.class));
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
+    PinotConfiguration cfg = new PinotConfiguration(properties);
+    _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    _serverRoutingStatsManager.init();
+    _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), 
_serverRoutingStatsManager);
+    _requestCount = 0;
   }
 
   private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes) {
@@ -88,6 +99,7 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(0, responseBytes);
@@ -95,13 +107,17 @@ public class QueryRoutingTest {
 
     // OFFLINE only
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 600_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
     assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    // 2 requests - query submit and query response.
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // REALTIME only
     asyncQueryResponse =
@@ -112,6 +128,9 @@ public class QueryRoutingTest {
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Hybrid
     asyncQueryResponse =
@@ -127,6 +146,9 @@ public class QueryRoutingTest {
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    _requestCount += 4;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Shut down the server
     queryServer.shutDown();
@@ -136,6 +158,7 @@ public class QueryRoutingTest {
   public void testInvalidResponse()
       throws Exception {
     long requestId = 123;
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(0, new byte[0]);
@@ -154,6 +177,9 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
     assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Shut down the server
     queryServer.shutDown();
@@ -166,6 +192,7 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(0, responseBytes);
@@ -184,6 +211,9 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
     assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Shut down the server
     queryServer.shutDown();
@@ -199,6 +229,7 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(500, responseBytes);
@@ -221,6 +252,10 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
     assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
+
 
     // Submit query after server is down
     startTimeMs = System.currentTimeMillis();
@@ -237,6 +272,15 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
     assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
+  }
+
+  private void waitForStatsUpdate(long taskCount) {
+    TestUtils.waitForCondition(aVoid -> {
+      return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount);
+    }, 5L, 5000, "Failed to record stats for AdaptiveServerSelectorTest");
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to