This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 86fe41e Timeout if waiting server channel lock takes a long time (#8083) 86fe41e is described below commit 86fe41e2e9c17090f1fa9bcc39dbd61e3b649402 Author: Liang Mingqiang <mili...@linkedin.com> AuthorDate: Tue Feb 1 10:50:17 2022 -0800 Timeout if waiting server channel lock takes a long time (#8083) - make requests be able to time out early when "waiting the channel lock". - move the request serialization logic out of critical section. --- .../apache/pinot/core/transport/QueryRouter.java | 3 +- .../pinot/core/transport/ServerChannels.java | 39 ++++++++++++++++------ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index c49ed54..40c32c8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -119,7 +119,8 @@ public class QueryRouter { ServerRoutingInstance serverRoutingInstance = entry.getKey(); ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels; try { - serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue()); + serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(), + timeoutMs); asyncQueryResponse.markRequestSubmitted(serverRoutingInstance); } catch (Exception e) { LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java index 8f70009..e5b422a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java @@ -33,6 +33,8 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMeter; @@ -53,6 +55,7 @@ import org.apache.thrift.protocol.TCompactProtocol; public class ServerChannels { private final QueryRouter _queryRouter; private final BrokerMetrics _brokerMetrics; + private final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory()); private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel> _serverToChannelMap = new ConcurrentHashMap<>(); private final EventLoopGroup _eventLoopGroup = new NioEventLoopGroup(); private final TlsConfig _tlsConfig; @@ -81,10 +84,11 @@ public class ServerChannels { } public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, - ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest) + ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, long timeoutMs) throws Exception { + byte[] requestBytes = _serializer.serialize(instanceRequest); _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new) - .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, instanceRequest); + .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes, timeoutMs); } public void shutDown() { @@ -94,9 +98,10 @@ public class ServerChannels { @ThreadSafe private class ServerChannel { - final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory()); final ServerRoutingInstance _serverRoutingInstance; final Bootstrap _bootstrap; + // lock to protect channel as requests must be written into channel sequentially + final ReentrantLock _channelLock = new ReentrantLock(); Channel _channel; ServerChannel(ServerRoutingInstance serverRoutingInstance) { @@ -122,8 +127,8 @@ public class ServerChannels { private void attachSSLHandler(SocketChannel ch) { try { - SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() - .sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider())); + SslContextBuilder sslContextBuilder = + SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider())); if (_tlsConfig.getKeyStorePath() != null) { sslContextBuilder.keyManager(TlsUtils.createKeyManagerFactory(_tlsConfig)); @@ -139,8 +144,22 @@ public class ServerChannels { } } - synchronized void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, - ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest) + private void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, + ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long timeoutMs) + throws Exception { + if (_channelLock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) { + try { + sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes); + } finally { + _channelLock.unlock(); + } + } else { + throw new TimeoutException("Timeout while acquiring channel lock"); + } + } + + private void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, + ServerRoutingInstance serverRoutingInstance, byte[] requestBytes) throws Exception { if (_channel == null || !_channel.isActive()) { long startTime = System.currentTimeMillis(); @@ -148,13 +167,11 @@ public class ServerChannels { _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.NETTY_CONNECTION_CONNECT_TIME_MS, System.currentTimeMillis() - startTime); } - byte[] requestBytes = _serializer.serialize(instanceRequest); long sendRequestStartTimeMs = System.currentTimeMillis(); _channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> { long requestSentLatencyMs = System.currentTimeMillis() - sendRequestStartTimeMs; - _brokerMetrics - .addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY, requestSentLatencyMs, - TimeUnit.MILLISECONDS); + _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY, + requestSentLatencyMs, TimeUnit.MILLISECONDS); asyncQueryResponse.markRequestSent(serverRoutingInstance, requestSentLatencyMs); }); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT, 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org