wuchong commented on code in PR #2803:
URL: https://github.com/apache/fluss/pull/2803#discussion_r3068185847


##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
         }
     }
 
-    public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int 
initialBytesToStrip) {
-        ch.pipeline()
-                .addLast(
-                        "frameDecoder",
-                        new LengthFieldBasedFrameDecoder(
-                                maxFrameLength, 0, 4, 0, initialBytesToStrip));
+    public void addFrameDecoder(
+            SocketChannel ch, int maxFrameLength, int initialBytesToStrip, 
boolean preferHeap) {
+        LengthFieldBasedFrameDecoder frameDecoder =
+                new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 
initialBytesToStrip);
+        if (preferHeap) {
+            frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+        }
+        ch.pipeline().addLast("frameDecoder", frameDecoder);
     }
 
     public void addIdleStateHandler(SocketChannel ch) {
         ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, 
maxIdleTimeSeconds));
     }
+
+    /**
+     * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the 
cumulation buffer on the JVM
+     * heap to avoid off-heap memory pressure from large partial messages.
+     *
+     * <p>On native transports (epoll / kqueue), {@code 
EpollRecvByteAllocatorHandle.allocate()}
+     * always forces the read buffer to be direct regardless of the channel 
allocator. The default
+     * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as 
the cumulation. For
+     * large partial messages, the cumulation can grow up to the maximum frame 
size (e.g. 32 MB), so
+     * multiple connections could consume significant off-heap memory.
+     */
+    static final class HeapPreferringCumulator implements 
ByteToMessageDecoder.Cumulator {

Review Comment:
   Could you add a comment explaining the origin of `HeapPreferringCumulator`, 
specifically detailing the source class it was derived from and the 
modifications made? This will help us track changes more effectively.
   
   Besides, please add unit tests for this class. You can let AI to learn from 
Netty source code to generate UT for this class. 



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java:
##########
@@ -89,19 +90,19 @@ public NettyClient(Configuration conf, ClientMetricGroup 
clientMetricGroup) {
         int connectTimeoutMs = (int) 
conf.get(ConfigOptions.CLIENT_CONNECT_TIMEOUT).toMillis();
         int connectionMaxIdle =
                 (int) 
conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
-        PooledByteBufAllocator pooledAllocator = 
PooledByteBufAllocator.DEFAULT;
+        PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
         this.bootstrap =
                 new Bootstrap()
                         .group(eventGroup)
                         
.channel(NettyUtils.getClientSocketChannelClass(eventGroup))
-                        .option(ChannelOption.ALLOCATOR, pooledAllocator)
+                        .option(ChannelOption.ALLOCATOR, allocator)
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
connectTimeoutMs)
                         .option(ChannelOption.TCP_NODELAY, true)
                         .option(ChannelOption.SO_KEEPALIVE, true)
-                        .handler(new 
ClientChannelInitializer(connectionMaxIdle));
+                        .handler(new 
ClientChannelInitializer(connectionMaxIdle, isInnerClient));

Review Comment:
   I think we should get the prefer heap option from `Configuration`, rather 
than hard code using `isInnerClient` here. 



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -931,6 +931,15 @@ public class ConfigOptions {
                             "The number of threads that the client uses for 
sending requests to the "
                                     + "network and receiving responses from 
network. The default value is 4");
 
+    public static final ConfigOption<Boolean> 
NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST =

Review Comment:
   This config is not used. I found this configuraiton is still useful, I think 
we should support it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to