This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e307106a59 AdaptiveServerSelection: update response stats for servers that have not responded (#9801) e307106a59 is described below commit e307106a59646a8e1d9475baabe66fa3cde09490 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Mon Nov 14 23:57:58 2022 -0800 AdaptiveServerSelection: update response stats for servers that have not responded (#9801) --- .../pinot/core/transport/AsyncQueryResponse.java | 26 +++++++++++- .../apache/pinot/core/transport/QueryRouter.java | 13 +----- .../pinot/core/transport/QueryRoutingTest.java | 48 +++++++++++++++++++++- 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index 7924a9afdb..3aecb184be 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; /** @@ -43,12 +44,13 @@ public class AsyncQueryResponse implements QueryResponse { private final CountDownLatch _countDownLatch; private final long _maxEndTimeMs; private final long _timeoutMs; + private final ServerRoutingStatsManager _serverRoutingStatsManager; private volatile ServerRoutingInstance _failedServer; private volatile Exception _exception; public AsyncQueryResponse(QueryRouter queryRouter, long requestId, Set<ServerRoutingInstance> serversQueried, - long startTimeMs, long timeoutMs) { + long startTimeMs, long timeoutMs, ServerRoutingStatsManager serverRoutingStatsManager) { _queryRouter = queryRouter; _requestId = requestId; int numServersQueried = serversQueried.size(); @@ -59,6 +61,7 @@ public class AsyncQueryResponse implements QueryResponse { _countDownLatch = new CountDownLatch(numServersQueried); _timeoutMs = timeoutMs; _maxEndTimeMs = startTimeMs + timeoutMs; + _serverRoutingStatsManager = serverRoutingStatsManager; } @Override @@ -84,6 +87,17 @@ public class AsyncQueryResponse implements QueryResponse { _status.compareAndSet(Status.IN_PROGRESS, finish ? Status.COMPLETED : Status.TIMED_OUT); return _responseMap; } finally { + // Update ServerRoutingStats. + for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : _responseMap.entrySet()) { + ServerResponse response = entry.getValue(); + if (response == null || response.getDataTable() == null) { + // These are servers from which a response was not received. So update query response stats for such + // servers with maximum latency i.e timeout value. + _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, entry.getKey().getInstanceId(), + _timeoutMs); + } + } + _queryRouter.markQueryDone(_requestId); } } @@ -134,7 +148,15 @@ public class AsyncQueryResponse implements QueryResponse { void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize, int deserializationTimeMs) { - _responseMap.get(serverRoutingInstance).receiveDataTable(dataTable, responseSize, deserializationTimeMs); + ServerResponse response = _responseMap.get(serverRoutingInstance); + response.receiveDataTable(dataTable, responseSize, deserializationTimeMs); + + // Record query completion stats immediately after receiving the response from the server instead of waiting + // for all servers to respond. This helps to keep the stats up-to-date. + long latencyMs = response.getResponseDelayMs(); + _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, serverRoutingInstance.getInstanceId(), + latencyMs); + _numServersResponded.getAndIncrement(); _countDownLatch.countDown(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 7a13035eae..e0ff080e11 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -116,7 +116,8 @@ public class QueryRouter { // Create the asynchronous query response with the request map AsyncQueryResponse asyncQueryResponse = - new AsyncQueryResponse(this, requestId, requestMap.keySet(), System.currentTimeMillis(), timeoutMs); + new AsyncQueryResponse(this, requestId, requestMap.keySet(), System.currentTimeMillis(), timeoutMs, + _serverRoutingStatsManager); _asyncQueryResponseMap.put(requestId, asyncQueryResponse); for (Map.Entry<ServerRoutingInstance, InstanceRequest> entry : requestMap.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey(); @@ -149,8 +150,6 @@ public class QueryRouter { AsyncQueryResponse asyncQueryResponse, Exception e) { LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId, serverRoutingInstance, e); - _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, serverRoutingInstance.getInstanceId(), - (int) asyncQueryResponse.getTimeoutMs()); asyncQueryResponse.markQueryFailed(serverRoutingInstance, e); } @@ -183,20 +182,12 @@ public class QueryRouter { // Query future might be null if the query is already done (maybe due to failure) if (asyncQueryResponse != null) { asyncQueryResponse.receiveDataTable(serverRoutingInstance, dataTable, responseSize, deserializationTimeMs); - - // Record query completion stats immediately after receiving the response from the server instead of waiting - // for the reduce phase. - long latencyMs = asyncQueryResponse.getServerResponseDelayMs(serverRoutingInstance); - _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, serverRoutingInstance.getInstanceId(), - latencyMs); } } void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception exception) { for (AsyncQueryResponse asyncQueryResponse : _asyncQueryResponseMap.values()) { asyncQueryResponse.markServerDown(serverRoutingInstance, exception); - _serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(), - serverRoutingInstance.getInstanceId(), (int) asyncQueryResponse.getTimeoutMs()); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 9fdaf7fb7d..5e1fa04176 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.transport; import com.google.common.util.concurrent.Futures; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.common.datatable.DataTable; @@ -33,7 +34,9 @@ import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -60,10 +63,18 @@ public class QueryRoutingTest { Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList()); private QueryRouter _queryRouter; + private ServerRoutingStatsManager _serverRoutingStatsManager; + int _requestCount; @BeforeClass public void setUp() { - _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), mock(ServerRoutingStatsManager.class)); + Map<String, Object> properties = new HashMap<>(); + properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true); + PinotConfiguration cfg = new PinotConfiguration(properties); + _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + _serverRoutingStatsManager.init(); + _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), _serverRoutingStatsManager); + _requestCount = 0; } private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) { @@ -88,6 +99,7 @@ public class QueryRoutingTest { DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(0, responseBytes); @@ -95,13 +107,17 @@ public class QueryRoutingTest { // OFFLINE only AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 600_000L); Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); assertNotNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseSize(), responseBytes.length); + // 2 requests - query submit and query response. + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // REALTIME only asyncQueryResponse = @@ -112,6 +128,9 @@ public class QueryRoutingTest { serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE); assertNotNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseSize(), responseBytes.length); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Hybrid asyncQueryResponse = @@ -127,6 +146,9 @@ public class QueryRoutingTest { serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE); assertNotNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseSize(), responseBytes.length); + _requestCount += 4; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Shut down the server queryServer.shutDown(); @@ -136,6 +158,7 @@ public class QueryRoutingTest { public void testInvalidResponse() throws Exception { long requestId = 123; + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(0, new byte[0]); @@ -154,6 +177,9 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should time out assertTrue(System.currentTimeMillis() - startTimeMs >= 1000); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Shut down the server queryServer.shutDown(); @@ -166,6 +192,7 @@ public class QueryRoutingTest { DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(0, responseBytes); @@ -184,6 +211,9 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should time out assertTrue(System.currentTimeMillis() - startTimeMs >= 1000); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Shut down the server queryServer.shutDown(); @@ -199,6 +229,7 @@ public class QueryRoutingTest { DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(500, responseBytes); @@ -221,6 +252,10 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should early terminate assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); + // Submit query after server is down startTimeMs = System.currentTimeMillis(); @@ -237,6 +272,15 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should early terminate assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); + } + + private void waitForStatsUpdate(long taskCount) { + TestUtils.waitForCondition(aVoid -> { + return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount); + }, 5L, 5000, "Failed to record stats for AdaptiveServerSelectorTest"); } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org