This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0b4f234e5 Make thrift deserializer threadsafe (#9299)
d0b4f234e5 is described below

commit d0b4f234e5f265f159ce607818c640ab11b2c3aa
Author: Saurabh Dubey <saurabhd...@gmail.com>
AuthorDate: Tue Aug 30 22:32:38 2022 +0530

    Make thrift deserializer threadsafe (#9299)
---
 .../pinot/core/transport/InstanceRequestHandler.java    | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 1c3e408744..43e9264007 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -73,7 +73,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
   private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
 
   private final String _instanceName;
-  private final TDeserializer _deserializer;
+  private final ThreadLocal<TDeserializer> _deserializer;
   private final QueryScheduler _queryScheduler;
   private final ServerMetrics _serverMetrics;
   private final AccessControl _accessControl;
@@ -85,11 +85,14 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
     _queryScheduler = queryScheduler;
     _serverMetrics = serverMetrics;
     _accessControl = accessControl;
-    try {
-      _deserializer = new TDeserializer(new TCompactProtocol.Factory());
-    } catch (TTransportException e) {
-      throw new RuntimeException("Failed to initialize Thrift Deserializer", 
e);
-    }
+    _deserializer = ThreadLocal.withInitial(() -> {
+      try {
+        return new TDeserializer(new TCompactProtocol.Factory());
+      } catch (TTransportException e) {
+        throw new RuntimeException("Failed to initialize Thrift Deserializer", 
e);
+      }
+    });
+
     if 
(Boolean.parseBoolean(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION)))
 {
       _queryFuturesById = new ConcurrentHashMap<>();
       LOGGER.info("Enable query cancellation");
@@ -135,7 +138,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
 
       // Parse instance request into ServerQueryRequest.
       msg.readBytes(requestBytes);
-      _deserializer.deserialize(instanceRequest, requestBytes);
+      _deserializer.get().deserialize(instanceRequest, requestBytes);
       queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, 
queryArrivalTimeMs);
       
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
 queryArrivalTimeMs)
           .stopAndRecord();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to