This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d0b4f234e5 Make thrift deserializer threadsafe (#9299) d0b4f234e5 is described below commit d0b4f234e5f265f159ce607818c640ab11b2c3aa Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Tue Aug 30 22:32:38 2022 +0530 Make thrift deserializer threadsafe (#9299) --- .../pinot/core/transport/InstanceRequestHandler.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index 1c3e408744..43e9264007 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -73,7 +73,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100; private final String _instanceName; - private final TDeserializer _deserializer; + private final ThreadLocal<TDeserializer> _deserializer; private final QueryScheduler _queryScheduler; private final ServerMetrics _serverMetrics; private final AccessControl _accessControl; @@ -85,11 +85,14 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> _queryScheduler = queryScheduler; _serverMetrics = serverMetrics; _accessControl = accessControl; - try { - _deserializer = new TDeserializer(new TCompactProtocol.Factory()); - } catch (TTransportException e) { - throw new RuntimeException("Failed to initialize Thrift Deserializer", e); - } + _deserializer = ThreadLocal.withInitial(() -> { + try { + return new TDeserializer(new TCompactProtocol.Factory()); + } catch (TTransportException e) { + throw new RuntimeException("Failed to initialize Thrift Deserializer", e); + } + }); + if (Boolean.parseBoolean(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION))) { _queryFuturesById = new ConcurrentHashMap<>(); LOGGER.info("Enable query cancellation"); @@ -135,7 +138,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> // Parse instance request into ServerQueryRequest. msg.readBytes(requestBytes); - _deserializer.deserialize(instanceRequest, requestBytes); + _deserializer.get().deserialize(instanceRequest, requestBytes); queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs); queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs) .stopAndRecord(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org