This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3452ef9fa9 Improve Adaptive Server Selection to penalize servers 
returning server side exceptions (#14029)
3452ef9fa9 is described below

commit 3452ef9fa9035134ad90d9da8a4f72acd5081c7f
Author: Kirupha Balasubramanian <kiruphabal...@gmail.com>
AuthorDate: Mon Sep 30 11:03:21 2024 -0700

    Improve Adaptive Server Selection to penalize servers returning server side 
exceptions (#14029)
    
    * PINOT-19249 ADSS penalize server with hardware issues WIP push
    
    PINOT-19249 Adding more unit test cases
    
    PINOT-19249 Addressing comments and changing code to cover edge cases
    
    PINOT-19249 Code changes based on comments
    
    PINOT-19249 Fixing test cases for ADSS penalizing servers with exceptions
    
    * Fixing linter
    
    * Empty-Commit
    
    * Review comments - Adding more comments and delta to assertEquals
---
 .../pinot/core/transport/AsyncQueryResponse.java   |  37 ++++-
 .../pinot/core/transport/QueryRoutingTest.java     | 176 +++++++++++++++++++++
 2 files changed, 208 insertions(+), 5 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 2ec90ab3b9..a9546cc054 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.HashUtil;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 
@@ -96,11 +97,14 @@ public class AsyncQueryResponse implements QueryResponse {
       // servers even if the query times out or if servers have not responded.
       for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : 
_responseMap.entrySet()) {
         ServerResponse response = entry.getValue();
-
-        // ServerResponse returns -1 if responseDelayMs is not set. This 
indicates that a response was not received
-        // from the server. Hence we set the latency to the timeout value.
-        long latency =
-            (response != null && response.getResponseDelayMs() >= 0) ? 
response.getResponseDelayMs() : _timeoutMs;
+        long latency;
+
+        // If server has not responded or if the server response has 
exceptions, the latency is set to timeout
+        if (hasServerNotResponded(response) || 
hasServerReturnedExceptions(response)) {
+          latency = _timeoutMs;
+        } else {
+          latency = response.getResponseDelayMs();
+        }
         _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, 
entry.getKey().getInstanceId(), latency);
       }
 
@@ -108,6 +112,29 @@ public class AsyncQueryResponse implements QueryResponse {
     }
   }
 
+  private boolean hasServerReturnedExceptions(ServerResponse response) {
+    if (response.getDataTable() != null && 
response.getDataTable().getExceptions().size() > 0) {
+      DataTable dataTable = response.getDataTable();
+      Map<Integer, String> exceptions = dataTable.getExceptions();
+
+      // If Server response has exceptions in Datatable set the latency for 
timeout value.
+      for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
+        // Check if the exceptions received are server side exceptions
+        if (!QueryException.isClientError(exception.getKey())) {
+          return true;
+        }
+      }
+      return false;
+    }
+    return false;
+  }
+
+  private boolean hasServerNotResponded(ServerResponse response) {
+    // ServerResponse returns -1 if responseDelayMs is not set. This indicates 
that a response was not received
+    // from the server. Hence we set the latency to the timeout value.
+    return response == null || response.getResponseDelayMs() < 0;
+  }
+
   @Override
   public String getServerStats() {
     StringBuilder stringBuilder = new StringBuilder(
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index cec413e424..1b32149d06 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -27,9 +27,11 @@ import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
@@ -201,6 +203,180 @@ public class QueryRoutingTest {
     queryServer.shutDown();
   }
 
+  @Test
+  public void testLatencyForQueryServerException()
+      throws Exception {
+    long requestId = 123;
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+    Exception exception = new UnsupportedOperationException("Caught 
exception.");
+    ProcessingException processingException =
+        QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, 
exception);
+    dataTable.addException(processingException);
+    byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
+    // Start the server
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    queryServer.start();
+
+    // Send a query with ServerSide exception and check if the latency is set 
to timeout value.
+    Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+    AsyncQueryResponse asyncQueryResponse =
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+    Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    Double latencyAfter = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+    if (latencyBefore == null) {
+      // This means that no queries were run before this test. So we can just 
make sure that latencyAfter is equal to
+      //666.334.
+      // This corresponds to the EWMA value when a latency timeout value of 
1000 is set. Latency set to timeout value
+      //when server side exception occurs.
+      double serverEWMALatency = 666.334;
+      // Leaving an error budget of 2%
+      double delta = 13.32;
+      assertEquals(latencyAfter, serverEWMALatency, delta);
+    } else {
+      assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be 
greater than " + latencyBefore);
+    }
+
+    // Shut down the server
+    queryServer.shutDown();
+  }
+
+  @Test
+  public void testLatencyForClientException()
+      throws Exception {
+    long requestId = 123;
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+    Exception exception = new UnsupportedOperationException("Caught 
exception.");
+    ProcessingException processingException =
+        QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, 
exception);
+    dataTable.addException(processingException);
+    byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
+    // Start the server
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    queryServer.start();
+
+    // Send a query with client side errors.
+    Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+    AsyncQueryResponse asyncQueryResponse =
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+    Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+
+    Double latencyAfter = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+    if (latencyBefore == null) {
+      // Latency for the server with client side exception is assigned as 
serverResponse.getResponseDelayMs() and the
+      //calculated
+      // EWMLatency for the server will be less than 
serverResponse.getResponseDelayMs()
+      assertTrue(latencyAfter <= serverResponse.getResponseDelayMs());
+    } else {
+      assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be 
lesser than " + latencyBefore);
+    }
+
+    // Shut down the server
+    queryServer.shutDown();
+  }
+
+  @Test
+  public void testLatencyForMultipleExceptions()
+      throws Exception {
+    long requestId = 123;
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+    Exception exception = new UnsupportedOperationException("Caught 
exception.");
+    ProcessingException processingException =
+        QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, 
exception);
+    ProcessingException processingServerException =
+        QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, 
exception);
+    dataTable.addException(processingServerException);
+    dataTable.addException(processingException);
+    byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
+    // Start the server
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    queryServer.start();
+
+    // Send a query with multiple exceptions. Make sure that the latency is 
set to timeout value even if a single
+    //server-side exception is seen.
+    Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+    AsyncQueryResponse asyncQueryResponse =
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+    Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    Double latencyAfter = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+    if (latencyBefore == null) {
+      // This means that no queries where run before this test. So we can just 
make sure that latencyAfter is equal
+      //to 666.334.
+      // This corresponds to the EWMA value when a latency timeout value of 
1000 is set.
+      double serverEWMALatency = 666.334;
+      // Leaving an error budget of 2%
+      double delta = 13.32;
+      assertEquals(latencyAfter, serverEWMALatency, delta);
+    } else {
+      assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be 
greater than " + latencyBefore);
+    }
+
+    // Shut down the server
+    queryServer.shutDown();
+  }
+
+  @Test
+  public void testLatencyForNoException()
+      throws Exception {
+    long requestId = 123;
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+    byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
+    // Start the server
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    queryServer.start();
+
+    // Send a valid query and get latency
+    Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+    AsyncQueryResponse asyncQueryResponse =
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+    Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    Double latencyAfter = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+    if (latencyBefore == null) {
+      // Latency for the server with no exceptions is assigned as 
serverResponse.getResponseDelayMs() and the calculated
+      // EWMLatency for the server will be less than 
serverResponse.getResponseDelayMs()
+      assertTrue(latencyAfter <= serverResponse.getResponseDelayMs());
+    } else {
+      assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be 
lesser than " + latencyBefore);
+    }
+
+    // Shut down the server
+    queryServer.shutDown();
+  }
+
   @Test
   public void testNonMatchingRequestId()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to