Copilot commented on code in PR #15335:
URL: https://github.com/apache/pinot/pull/15335#discussion_r2037547484


##########
pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java:
##########
@@ -63,25 +73,63 @@ public void channelInactive(ChannelHandlerContext ctx) {
     ctx.fireChannelInactive();
   }
 
+  /**
+   * Closes and removes all active channels from the map to release direct 
memory.
+   */
+  private void closeAllChannels() {
+    LOGGER.warn("OOM detected: Closing all channels to release direct memory");
+    for (SocketChannel channel : _allChannels.keySet()) {
+      try {
+        setSilentShutdown(channel);
+        LOGGER.info("Closing channel: {}", channel);
+        if (channel != null) {
+          channel.close();
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error while closing channel: {}", channel, e);
+      } finally {
+        _allChannels.remove(channel);
+      }
+    }
+  }
+
+  // silent shutdown for the channels without firing channelInactive
+  private void setSilentShutdown(SocketChannel socketChannel) {
+    if (socketChannel != null) {
+      DirectOOMHandler directOOMHandler = 
socketChannel.pipeline().get(DirectOOMHandler.class);
+      if (directOOMHandler != null) {
+        directOOMHandler.setSilentShutDown();
+      }
+    }
+  }
+
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
     // catch direct memory oom here
-    if (cause instanceof OutOfMemoryError
-        && StringUtils.containsIgnoreCase(cause.getMessage(), "direct 
buffer")) {
-      BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM, 
1L);
+    if (cause instanceof OutOfMemoryError && 
StringUtils.containsIgnoreCase(cause.getMessage(), "direct buffer")) {
       // only one thread can get here and do the shutdown
       if (DIRECT_OOM_SHUTTING_DOWN.compareAndSet(false, true)) {
         try {
-          LOGGER.error("Closing ALL channels to servers, as we are running out 
of direct memory "
-              + "while receiving response from {}", _serverRoutingInstance, 
cause);
-          // close all channels to servers
-          _serverToChannelMap.keySet().forEach(serverRoutingInstance -> {
-            ServerChannels.ServerChannel removed = 
_serverToChannelMap.remove(serverRoutingInstance);
-            removed.closeChannel();
-            removed.setSilentShutdown();
-          });
-          _queryRouter.markServerDown(_serverRoutingInstance,
-              new QueryCancelledException("Query cancelled as broker is out of 
direct memory"));
+          if (_serverToChannelMap != null && !_serverToChannelMap.isEmpty()) {
+            LOGGER.error("Closing ALL channels to servers, as we are running 
out of direct memory "
+                + "while receiving response from {}", _serverRoutingInstance, 
cause); // broker side direct OOM handler
+            
BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM, 1L);
+
+            // close all channels to servers
+            _serverToChannelMap.keySet().forEach(serverRoutingInstance -> {
+              ServerChannels.ServerChannel removed = 
_serverToChannelMap.remove(serverRoutingInstance);
+              removed.closeChannel();
+              removed.setSilentShutdown();
+            });
+            _queryRouter.markServerDown(_serverRoutingInstance,
+                new QueryCancelledException("Query cancelled as broker is out 
of direct memory"));
+          } else if (_allChannels != null && !_allChannels.isEmpty()) { // 
server side direct OOM handler
+            LOGGER.error("Closing channel from broker, as we are running out 
of direct memory "
+                + "while initiating request to server", cause);
+            cause.printStackTrace();

Review Comment:
   [nitpick] Consider removing 'cause.printStackTrace()' and rely solely on 
logger.error to log the exception, ensuring consistent logging in production 
code.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to