This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e62a271221 Fix direct memory OOM on broker (#11496) e62a271221 is described below commit e62a2712210b16020b8f3e3c0de9578cb017909c Author: Jia Guo <jia...@linkedin.com> AuthorDate: Tue Sep 12 14:23:33 2023 -0700 Fix direct memory OOM on broker (#11496) * close channel on direct oom, log large response * Trigger Test * close channel on direct oom, log large response * close channel on direct oom, log large response * Trigger Test * close channel on direct oom, log large response * move metrics send to reflect all direct oom incidents --- .../apache/pinot/common/metrics/BrokerMeter.java | 3 +- .../apache/pinot/common/metrics/ServerMeter.java | 3 +- .../core/transport/ChannelHandlerFactory.java | 6 ++ .../pinot/core/transport/DirectOOMHandler.java | 95 ++++++++++++++++++++++ .../core/transport/InstanceRequestHandler.java | 6 ++ .../pinot/core/transport/ServerChannels.java | 24 +++++- 6 files changed, 132 insertions(+), 5 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 7854e80806..bde0f3c409 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -98,7 +98,8 @@ public enum BrokerMeter implements AbstractMetrics.Meter { NETTY_CONNECTION_BYTES_SENT("nettyConnection", true), NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true), - PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true); + PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true), + DIRECT_MEMORY_OOM("directMemoryOOMCount", true); private final String _brokerMeterName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 4681b41659..09701ed335 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -102,7 +102,8 @@ public enum ServerMeter implements AbstractMetrics.Meter { NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedInvalid", false), NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", false), - NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),; + NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false), + LARGE_QUERY_RESPONSES_SENT("largeResponses", false); private final String _meterName; private final String _unit; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java index 94b1c3c1d9..d38e53957e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; +import java.util.concurrent.ConcurrentHashMap; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerMetrics; @@ -87,4 +88,9 @@ public class ChannelHandlerFactory { QueryScheduler queryScheduler, ServerMetrics serverMetrics, AccessControl accessControl) { return new InstanceRequestHandler(instanceName, config, queryScheduler, serverMetrics, accessControl); } + + public static ChannelHandler getDirectOOMHandler(QueryRouter queryRouter, ServerRoutingInstance serverRoutingInstance, + ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel> serverToChannelMap) { + return new DirectOOMHandler(queryRouter, serverRoutingInstance, serverToChannelMap); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java new file mode 100644 index 0000000000..fad085a02e --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.transport; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.spi.exception.QueryCancelledException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handling netty direct memory OOM. In this case there is a great chance that multiple channels are receiving + * large data tables from servers concurrently. We want to close all channels to servers to proactively release + * the direct memory, because the execution of netty threads can all block in allocating direct memory, in which case + * no one will reach channelRead0. + */ +public class DirectOOMHandler extends ChannelInboundHandlerAdapter { + private static final Logger LOGGER = LoggerFactory.getLogger(DataTableHandler.class); + private static final AtomicBoolean DIRECT_OOM_SHUTTING_DOWN = new AtomicBoolean(false); + private final QueryRouter _queryRouter; + private final ServerRoutingInstance _serverRoutingInstance; + private final ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel> _serverToChannelMap; + private volatile boolean _silentShutDown = false; + + public DirectOOMHandler(QueryRouter queryRouter, ServerRoutingInstance serverRoutingInstance, + ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel> serverToChannelMap) { + _queryRouter = queryRouter; + _serverRoutingInstance = serverRoutingInstance; + _serverToChannelMap = serverToChannelMap; + } + + public void setSilentShutDown() { + _silentShutDown = true; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + // if we are shutting down channels due to direct memory OOM, we short circuit the channel inactive + if (_silentShutDown) { + return; + } + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // catch direct memory oom here + if (cause instanceof OutOfMemoryError && cause.getMessage().contains("Direct buffer")) { + BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM, 1L); + // only one thread can get here and do the shutdown + if (DIRECT_OOM_SHUTTING_DOWN.compareAndSet(false, true)) { + try { + LOGGER.error("Closing ALL channels to servers, as we are running out of direct memory " + + "while receiving response from {}", _serverRoutingInstance, cause); + // close all channels to servers + _serverToChannelMap.keySet().forEach(serverRoutingInstance -> { + ServerChannels.ServerChannel removed = _serverToChannelMap.remove(serverRoutingInstance); + removed.closeChannel(); + removed.setSilentShutdown(); + }); + _queryRouter.markServerDown(_serverRoutingInstance, + new QueryCancelledException("Query cancelled as broker is out of direct memory")); + } catch (Exception e) { + LOGGER.error("Caught exception while handling direct memory OOM", e); + } finally { + DIRECT_OOM_SHUTTING_DOWN.set(false); + } + } else { + LOGGER.warn("Caught direct memory OOM, but another thread is already handling it", cause); + } + } else { + ctx.fireExceptionCaught(cause); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index e64322e711..4940823b74 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -71,6 +71,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> // TODO: make it configurable private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100; + private static final int LARGE_RESPONSE_SIZE_THRESHOLD_BYTES = 100 * 1024 * 1024; // 100 MB private final String _instanceName; private final ThreadLocal<TDeserializer> _deserializer; @@ -321,6 +322,11 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> LOGGER.info("Slow query: request handler processing time: {}, send response latency: {}, total time to handle " + "request: {}", queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs); } + if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) { + LOGGER.warn("Large query: response size in bytes: {}, table name {}", + serializedDataTable.length, tableNameWithType); + ServerMetrics.get().addMeteredTableValue(tableNameWithType, ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1); + } }); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java index f9ed68b08f..82bbf64333 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java @@ -142,7 +142,7 @@ public class ServerChannels { } @ThreadSafe - private class ServerChannel { + class ServerChannel { final ServerRoutingInstance _serverRoutingInstance; final Bootstrap _bootstrap; // lock to protect channel as requests must be written into channel sequentially @@ -164,14 +164,32 @@ public class ServerChannels { ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder()); ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender()); + ch.pipeline().addLast( + ChannelHandlerFactory.getDirectOOMHandler(_queryRouter, _serverRoutingInstance, _serverToChannelMap) + ); // NOTE: data table de-serialization happens inside this handler // Revisit if this becomes a bottleneck - ch.pipeline().addLast( - ChannelHandlerFactory.getDataTableHandler(_queryRouter, _serverRoutingInstance, _brokerMetrics)); + ch.pipeline().addLast(ChannelHandlerFactory + .getDataTableHandler(_queryRouter, _serverRoutingInstance, _brokerMetrics)); } }); } + void closeChannel() { + if (_channel != null) { + _channel.close(); + } + } + + void setSilentShutdown() { + if (_channel != null) { + DirectOOMHandler directOOMHandler = _channel.pipeline().get(DirectOOMHandler.class); + if (directOOMHandler != null) { + directOOMHandler.setSilentShutDown(); + } + } + } + void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long timeoutMs) throws InterruptedException, TimeoutException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org