This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 98d569d Always return a response from query execution. (#6596) 98d569d is described below commit 98d569db6deaab6ece09c08ece5e29a33eceab0f Author: Amrish Lal <amrish.k....@gmail.com> AuthorDate: Wed Feb 24 20:49:09 2021 -0800 Always return a response from query execution. (#6596) * Always return a response from query execution. * Cleanup. * Handle all exceptions in chanelRead0 method. * codereview changes. * codereview changes. * Rebuild. * Add test case. * Add test case. * code review changes. * codereview changes. * code review changes. * Cleanup comments. * Rebuild. --- .../apache/pinot/common/metrics/ServerMeter.java | 1 - .../pinot/core/query/scheduler/QueryScheduler.java | 4 + .../core/transport/InstanceRequestHandler.java | 150 +++++++++++++++------ .../tests/OfflineClusterIntegrationTest.java | 24 ++++ 4 files changed, 135 insertions(+), 44 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 39cc24e..37e166d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -27,7 +27,6 @@ import org.apache.pinot.common.Utils; public enum ServerMeter implements AbstractMetrics.Meter { QUERIES("queries", true), UNCAUGHT_EXCEPTIONS("exceptions", true), - REQUEST_FETCH_EXCEPTIONS("exceptions", true), REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true), RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true), SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 69c43de..6a19fbf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -306,6 +306,10 @@ public abstract class QueryScheduler { protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest, ProcessingException error) { DataTable result = new DataTableImplV2(); + + Map<String, String> dataTableMetadata = result.getMetadata(); + dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(queryRequest.getRequestId())); + result.addException(error); return Futures.immediateFuture(serializeDataTable(queryRequest, result)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index 3744326..bdcac02 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -25,17 +25,22 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerQueryPhase; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.request.InstanceRequest; -import org.apache.pinot.spi.utils.BytesUtils; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableImplV2; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.QueryScheduler; +import org.apache.pinot.spi.utils.BytesUtils; import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,67 +65,126 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> _serverMetrics = serverMetrics; } + /** + * Always return a response even when query execution throws exception; otherwise, broker + * will keep waiting until timeout. + */ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { - long queryArrivalTimeMs = System.currentTimeMillis(); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1); - int requestSize = msg.readableBytes(); - _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize); - byte[] requestBytes = new byte[requestSize]; - msg.readBytes(requestBytes); - - InstanceRequest instanceRequest = new InstanceRequest(); + long queryArrivalTimeMs = 0; + InstanceRequest instanceRequest = null; + byte[] requestBytes = null; + try { + // Put all code inside try block to catch all exceptions. + int requestSize = msg.readableBytes(); + + instanceRequest = new InstanceRequest(); + ServerQueryRequest queryRequest; + requestBytes = new byte[requestSize]; + + queryArrivalTimeMs = System.currentTimeMillis(); + _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1); + _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize); + + // Parse instance request into ServerQueryRequest. + msg.readBytes(requestBytes); _deserializer.deserialize(instanceRequest, requestBytes); + queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs); + queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs) + .stopAndRecord(); + + // Submit query for execution and register callback for execution results. + Futures.addCallback(_queryScheduler.submit(queryRequest), + createCallback(ctx, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor()); } catch (Exception e) { - LOGGER - .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes), - e); - _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1); - return; - } + if (e instanceof TException) { + // Deserialization exception + _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1); + } - ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs); - queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs) - .stopAndRecord(); + // Send error response + String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : ""; + long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0; + LOGGER.error("Exception while processing instance request: {}", hexString, e); + sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new DataTableImplV2(), e); + } + } - // NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version - Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() { + private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, long queryArrivalTimeMs, + InstanceRequest instanceRequest, ServerQueryRequest queryRequest) { + return new FutureCallback<byte[]>() { @Override public void onSuccess(@Nullable byte[] responseBytes) { - // NOTE: response bytes can be null if data table serialization throws exception if (responseBytes != null) { - long sendResponseStartTimeMs = System.currentTimeMillis(); - int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs); - ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> { - long sendResponseEndTimeMs = System.currentTimeMillis(); - int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs); - _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1); - _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length); - _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs, - TimeUnit.MILLISECONDS); - - int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs); - if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) { - LOGGER.info( - "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}", - queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs); - } - }); + // responseBytes contains either query results or exception. + sendResponse(ctx, queryArrivalTimeMs, responseBytes); + } else { + // Send exception response. + sendErrorResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), + new Exception("Null query response.")); } } @Override public void onFailure(Throwable t) { - LOGGER.error("Caught exception while processing instance request", t); - _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1); + // Send exception response. + LOGGER.error("Exception while processing instance request", t); + sendErrorResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), + new Exception(t)); } - }, MoreExecutors.directExecutor()); + }; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOGGER.error("Caught exception while fetching instance request", cause); - _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1); + // All exceptions should be caught and handled in channelRead0 method. This is a fallback method that + // will only be called if for some remote reason we are unable to handle exceptions in channelRead0. + String message = "Unhandled Exception in " + getClass().getCanonicalName(); + LOGGER.error(message, cause); + sendErrorResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(message, cause)); + } + + /** + * Send an exception back to broker as response to the query request. + */ + private void sendErrorResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, + DataTable dataTable, Exception e) { + try { + Map<String, String> dataTableMetadata = dataTable.getMetadata(); + dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId)); + dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + byte[] serializedDataTable = dataTable.toBytes(); + sendResponse(ctx, queryArrivalTimeMs, serializedDataTable); + } catch (Exception exception) { + LOGGER.error("Exception while sending query processing error to Broker.", exception); + } finally { + // Log query processing exception + LOGGER.error("Query processing error: ", e); + _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1); + } + } + + /** + * Send a response (either query results or exception) back to broker as response to the query request. + */ + private void sendResponse(ChannelHandlerContext ctx, long queryArrivalTimeMs, byte[] serializedDataTable) { + long sendResponseStartTimeMs = System.currentTimeMillis(); + int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs); + ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> { + long sendResponseEndTimeMs = System.currentTimeMillis(); + int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs); + _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1); + _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, serializedDataTable.length); + _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs, + TimeUnit.MILLISECONDS); + + int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs); + if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) { + LOGGER.info( + "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}", + queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs); + } + }); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index b1bb207..00876d4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.validation.constraints.AssertTrue; import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; import org.apache.pinot.common.exception.QueryException; @@ -438,6 +439,29 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet }, 600_000L, "Failed to generate bloom filter"); } + /** Check if server returns error response quickly without timing out Broker. */ + @Test + public void testServerErrorWithBrokerTimeout() + throws Exception { + // Set query timeout + long queryTimeout = 5000; + TableConfig tableConfig = getOfflineTableConfig(); + tableConfig.setQueryConfig(new QueryConfig(queryTimeout)); + updateTableConfig(tableConfig); + + long startTime = System.currentTimeMillis(); + // The query below will fail execution due to use of double quotes around value in IN clause. + JsonNode queryResponse = postSqlQuery("SELECT count(*) FROM mytable WHERE Dest IN (\"DFW\")"); + String result = queryResponse.toPrettyString(); + + assertTrue(System.currentTimeMillis() - startTime < queryTimeout); + assertTrue(queryResponse.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError")); + + // Remove timeout + tableConfig.setQueryConfig(null); + updateTableConfig(tableConfig); + } + @Test public void testStarTreeTriggering() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org