walterddr commented on code in PR #10761:
URL: https://github.com/apache/pinot/pull/10761#discussion_r1192826109


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/ExchangeService.java:
##########
@@ -0,0 +1,60 @@
+package org.apache.pinot.query.runtime.operator.exchange;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExchangeService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExchangeService.class);
+  private static final int DANGLING_EXCHANGE_EXPIRY_SECONDS = 300;
+
+  private final Cache<OpChainId, Future<?>> _submittedExchangeCache =
+      
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_EXCHANGE_EXPIRY_SECONDS, 
TimeUnit.SECONDS)
+          .removalListener((RemovalListener<OpChainId, Future<?>>) 
notification -> {
+            if (notification.wasEvicted()) {
+              Future<?> future = notification.getValue();
+              if (!future.isDone()) {
+                LOGGER.warn("Evicting dangling exchange request for {}}", 
notification.getKey());
+                future.cancel(true);
+              }
+            }
+          }).build();
+  private final ExecutorService _exchangeExecutor;
+
+  public ExchangeService(String hostname, int port, PinotConfiguration config) 
{
+    _exchangeExecutor = Executors.newCachedThreadPool();
+    LOGGER.info("Initialized ExchangeService with hostname: {}, port: {}", 
hostname, port);
+  }
+
+  /**
+   * submit a block exchange to sending service for a single OpChain.
+   *
+   * Notice that the logic inside the {@link BlockExchange#send()} should 
guarantee the submitted Runnable object
+   *     to be terminated successfully or after opChain timeout.
+   *
+   * @param blockExchange the exchange object of the OpChain with all the 
pending data to be sent.
+   */
+  public void submitExchangeRequest(OpChainId opChainId, BlockExchange 
blockExchange) {
+    _submittedExchangeCache.put(opChainId, _exchangeExecutor.submit(() -> {
+      TransferableBlock block = blockExchange.send();
+      while (!TransferableBlockUtils.isEndOfStream(block)) {
+        block = blockExchange.send();
+      }
+    }));

Review Comment:
   this will submit the send exchange to another threadpool for execution. 
   - if any of the GRPC/In-mem channel is blocked it should back propagate the 
condition to BlockExchange class and cause it to slow down producing new 
TransferrableBlocks
   - anytime a block is sent successful it will trigger the opChain to produce 
more block (if not already in running state)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to