This is an automated email from the ASF dual-hosted git repository. vvivekiyer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3452ef9fa9 Improve Adaptive Server Selection to penalize servers returning server side exceptions (#14029) 3452ef9fa9 is described below commit 3452ef9fa9035134ad90d9da8a4f72acd5081c7f Author: Kirupha Balasubramanian <kiruphabal...@gmail.com> AuthorDate: Mon Sep 30 11:03:21 2024 -0700 Improve Adaptive Server Selection to penalize servers returning server side exceptions (#14029) * PINOT-19249 ADSS penalize server with hardware issues WIP push PINOT-19249 Adding more unit test cases PINOT-19249 Addressing comments and changing code to cover edge cases PINOT-19249 Code changes based on comments PINOT-19249 Fixing test cases for ADSS penalizing servers with exceptions * Fixing linter * Empty-Commit * Review comments - Adding more comments and delta to assertEquals --- .../pinot/core/transport/AsyncQueryResponse.java | 37 ++++- .../pinot/core/transport/QueryRoutingTest.java | 176 +++++++++++++++++++++ 2 files changed, 208 insertions(+), 5 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index 2ec90ab3b9..a9546cc054 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -96,11 +97,14 @@ public class AsyncQueryResponse implements QueryResponse { // servers even if the query times out or if servers have not responded. for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : _responseMap.entrySet()) { ServerResponse response = entry.getValue(); - - // ServerResponse returns -1 if responseDelayMs is not set. This indicates that a response was not received - // from the server. Hence we set the latency to the timeout value. - long latency = - (response != null && response.getResponseDelayMs() >= 0) ? response.getResponseDelayMs() : _timeoutMs; + long latency; + + // If server has not responded or if the server response has exceptions, the latency is set to timeout + if (hasServerNotResponded(response) || hasServerReturnedExceptions(response)) { + latency = _timeoutMs; + } else { + latency = response.getResponseDelayMs(); + } _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, entry.getKey().getInstanceId(), latency); } @@ -108,6 +112,29 @@ public class AsyncQueryResponse implements QueryResponse { } } + private boolean hasServerReturnedExceptions(ServerResponse response) { + if (response.getDataTable() != null && response.getDataTable().getExceptions().size() > 0) { + DataTable dataTable = response.getDataTable(); + Map<Integer, String> exceptions = dataTable.getExceptions(); + + // If Server response has exceptions in Datatable set the latency for timeout value. + for (Map.Entry<Integer, String> exception : exceptions.entrySet()) { + // Check if the exceptions received are server side exceptions + if (!QueryException.isClientError(exception.getKey())) { + return true; + } + } + return false; + } + return false; + } + + private boolean hasServerNotResponded(ServerResponse response) { + // ServerResponse returns -1 if responseDelayMs is not set. This indicates that a response was not received + // from the server. Hence we set the latency to the timeout value. + return response == null || response.getResponseDelayMs() < 0; + } + @Override public String getServerStats() { StringBuilder stringBuilder = new StringBuilder( diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index cec413e424..1b32149d06 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -27,9 +27,11 @@ import java.util.Map; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; @@ -201,6 +203,180 @@ public class QueryRoutingTest { queryServer.shutDown(); } + @Test + public void testLatencyForQueryServerException() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Exception exception = new UnsupportedOperationException("Caught exception."); + ProcessingException processingException = + QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, exception); + dataTable.addException(processingException); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a query with ServerSide exception and check if the latency is set to timeout value. + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // This means that no queries were run before this test. So we can just make sure that latencyAfter is equal to + //666.334. + // This corresponds to the EWMA value when a latency timeout value of 1000 is set. Latency set to timeout value + //when server side exception occurs. + double serverEWMALatency = 666.334; + // Leaving an error budget of 2% + double delta = 13.32; + assertEquals(latencyAfter, serverEWMALatency, delta); + } else { + assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be greater than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + + @Test + public void testLatencyForClientException() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Exception exception = new UnsupportedOperationException("Caught exception."); + ProcessingException processingException = + QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, exception); + dataTable.addException(processingException); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a query with client side errors. + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // Latency for the server with client side exception is assigned as serverResponse.getResponseDelayMs() and the + //calculated + // EWMLatency for the server will be less than serverResponse.getResponseDelayMs() + assertTrue(latencyAfter <= serverResponse.getResponseDelayMs()); + } else { + assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be lesser than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + + @Test + public void testLatencyForMultipleExceptions() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Exception exception = new UnsupportedOperationException("Caught exception."); + ProcessingException processingException = + QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, exception); + ProcessingException processingServerException = + QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, exception); + dataTable.addException(processingServerException); + dataTable.addException(processingException); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a query with multiple exceptions. Make sure that the latency is set to timeout value even if a single + //server-side exception is seen. + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // This means that no queries where run before this test. So we can just make sure that latencyAfter is equal + //to 666.334. + // This corresponds to the EWMA value when a latency timeout value of 1000 is set. + double serverEWMALatency = 666.334; + // Leaving an error budget of 2% + double delta = 13.32; + assertEquals(latencyAfter, serverEWMALatency, delta); + } else { + assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be greater than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + + @Test + public void testLatencyForNoException() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a valid query and get latency + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // Latency for the server with no exceptions is assigned as serverResponse.getResponseDelayMs() and the calculated + // EWMLatency for the server will be less than serverResponse.getResponseDelayMs() + assertTrue(latencyAfter <= serverResponse.getResponseDelayMs()); + } else { + assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be lesser than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + @Test public void testNonMatchingRequestId() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org