Copilot commented on code in PR #15335: URL: https://github.com/apache/pinot/pull/15335#discussion_r2037547484
########## pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java: ########## @@ -63,25 +73,63 @@ public void channelInactive(ChannelHandlerContext ctx) { ctx.fireChannelInactive(); } + /** + * Closes and removes all active channels from the map to release direct memory. + */ + private void closeAllChannels() { + LOGGER.warn("OOM detected: Closing all channels to release direct memory"); + for (SocketChannel channel : _allChannels.keySet()) { + try { + setSilentShutdown(channel); + LOGGER.info("Closing channel: {}", channel); + if (channel != null) { + channel.close(); + } + } catch (Exception e) { + LOGGER.error("Error while closing channel: {}", channel, e); + } finally { + _allChannels.remove(channel); + } + } + } + + // silent shutdown for the channels without firing channelInactive + private void setSilentShutdown(SocketChannel socketChannel) { + if (socketChannel != null) { + DirectOOMHandler directOOMHandler = socketChannel.pipeline().get(DirectOOMHandler.class); + if (directOOMHandler != null) { + directOOMHandler.setSilentShutDown(); + } + } + } + @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // catch direct memory oom here - if (cause instanceof OutOfMemoryError - && StringUtils.containsIgnoreCase(cause.getMessage(), "direct buffer")) { - BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM, 1L); + if (cause instanceof OutOfMemoryError && StringUtils.containsIgnoreCase(cause.getMessage(), "direct buffer")) { // only one thread can get here and do the shutdown if (DIRECT_OOM_SHUTTING_DOWN.compareAndSet(false, true)) { try { - LOGGER.error("Closing ALL channels to servers, as we are running out of direct memory " - + "while receiving response from {}", _serverRoutingInstance, cause); - // close all channels to servers - _serverToChannelMap.keySet().forEach(serverRoutingInstance -> { - ServerChannels.ServerChannel removed = _serverToChannelMap.remove(serverRoutingInstance); - removed.closeChannel(); - removed.setSilentShutdown(); - }); - _queryRouter.markServerDown(_serverRoutingInstance, - new QueryCancelledException("Query cancelled as broker is out of direct memory")); + if (_serverToChannelMap != null && !_serverToChannelMap.isEmpty()) { + LOGGER.error("Closing ALL channels to servers, as we are running out of direct memory " + + "while receiving response from {}", _serverRoutingInstance, cause); // broker side direct OOM handler + BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM, 1L); + + // close all channels to servers + _serverToChannelMap.keySet().forEach(serverRoutingInstance -> { + ServerChannels.ServerChannel removed = _serverToChannelMap.remove(serverRoutingInstance); + removed.closeChannel(); + removed.setSilentShutdown(); + }); + _queryRouter.markServerDown(_serverRoutingInstance, + new QueryCancelledException("Query cancelled as broker is out of direct memory")); + } else if (_allChannels != null && !_allChannels.isEmpty()) { // server side direct OOM handler + LOGGER.error("Closing channel from broker, as we are running out of direct memory " + + "while initiating request to server", cause); + cause.printStackTrace(); Review Comment: [nitpick] Consider removing 'cause.printStackTrace()' and rely solely on logger.error to log the exception, ensuring consistent logging in production code. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org