wuchong commented on code in PR #2803:
URL: https://github.com/apache/fluss/pull/2803#discussion_r3068185847
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws
Exception {
}
}
- public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int
initialBytesToStrip) {
- ch.pipeline()
- .addLast(
- "frameDecoder",
- new LengthFieldBasedFrameDecoder(
- maxFrameLength, 0, 4, 0, initialBytesToStrip));
+ public void addFrameDecoder(
+ SocketChannel ch, int maxFrameLength, int initialBytesToStrip,
boolean preferHeap) {
+ LengthFieldBasedFrameDecoder frameDecoder =
+ new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0,
initialBytesToStrip);
+ if (preferHeap) {
+ frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+ }
+ ch.pipeline().addLast("frameDecoder", frameDecoder);
}
public void addIdleStateHandler(SocketChannel ch) {
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0,
maxIdleTimeSeconds));
}
+
+ /**
+ * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the
cumulation buffer on the JVM
+ * heap to avoid off-heap memory pressure from large partial messages.
+ *
+ * <p>On native transports (epoll / kqueue), {@code
EpollRecvByteAllocatorHandle.allocate()}
+ * always forces the read buffer to be direct regardless of the channel
allocator. The default
+ * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as
the cumulation. For
+ * large partial messages, the cumulation can grow up to the maximum frame
size (e.g. 32 MB), so
+ * multiple connections could consume significant off-heap memory.
+ */
+ static final class HeapPreferringCumulator implements
ByteToMessageDecoder.Cumulator {
Review Comment:
Could you add a comment explaining the origin of `HeapPreferringCumulator`,
specifically detailing the source class it was derived from and the
modifications made? This will help us track changes more effectively.
Besides, please add unit tests for this class. You can let AI to learn from
Netty source code to generate UT for this class.
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java:
##########
@@ -89,19 +90,19 @@ public NettyClient(Configuration conf, ClientMetricGroup
clientMetricGroup) {
int connectTimeoutMs = (int)
conf.get(ConfigOptions.CLIENT_CONNECT_TIMEOUT).toMillis();
int connectionMaxIdle =
(int)
conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
- PooledByteBufAllocator pooledAllocator =
PooledByteBufAllocator.DEFAULT;
+ PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
this.bootstrap =
new Bootstrap()
.group(eventGroup)
.channel(NettyUtils.getClientSocketChannelClass(eventGroup))
- .option(ChannelOption.ALLOCATOR, pooledAllocator)
+ .option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
connectTimeoutMs)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new
ClientChannelInitializer(connectionMaxIdle));
+ .handler(new
ClientChannelInitializer(connectionMaxIdle, isInnerClient));
Review Comment:
I think we should get the prefer heap option from `Configuration`, rather
than hard code using `isInnerClient` here.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -931,6 +931,15 @@ public class ConfigOptions {
"The number of threads that the client uses for
sending requests to the "
+ "network and receiving responses from
network. The default value is 4");
+ public static final ConfigOption<Boolean>
NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST =
Review Comment:
This config is not used. I found this configuraiton is still useful, I think
we should support it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]