Copilot commented on code in PR #2803:
URL: https://github.com/apache/fluss/pull/2803#discussion_r3068181611


##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
         }
     }
 
-    public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int 
initialBytesToStrip) {
-        ch.pipeline()
-                .addLast(
-                        "frameDecoder",
-                        new LengthFieldBasedFrameDecoder(
-                                maxFrameLength, 0, 4, 0, initialBytesToStrip));
+    public void addFrameDecoder(
+            SocketChannel ch, int maxFrameLength, int initialBytesToStrip, 
boolean preferHeap) {
+        LengthFieldBasedFrameDecoder frameDecoder =
+                new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 
initialBytesToStrip);
+        if (preferHeap) {
+            frameDecoder.setCumulator(new HeapPreferringCumulator(4));

Review Comment:
   `LengthFieldBasedFrameDecoder#setCumulator(...)` is not publicly accessible 
in Netty 4.1.104.Final (it’s a protected method on `ByteToMessageDecoder`), so 
calling it here is likely to fail compilation. To keep this configurable, wrap 
the frame decoder in a small subclass/anonymous subclass and call 
`setCumulator(...)` from inside that subclass (or provide a custom 
`LengthFieldBasedFrameDecoder` implementation that sets the cumulator in its 
constructor).
   ```suggestion
           LengthFieldBasedFrameDecoder frameDecoder;
           if (preferHeap) {
               frameDecoder =
                       new LengthFieldBasedFrameDecoder(
                               maxFrameLength, 0, 4, 0, initialBytesToStrip) {
                           {
                               setCumulator(new HeapPreferringCumulator(4));
                           }
                       };
           } else {
               frameDecoder =
                       new LengthFieldBasedFrameDecoder(
                               maxFrameLength, 0, 4, 0, initialBytesToStrip);
   ```



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
         }
     }
 
-    public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int 
initialBytesToStrip) {
-        ch.pipeline()
-                .addLast(
-                        "frameDecoder",
-                        new LengthFieldBasedFrameDecoder(
-                                maxFrameLength, 0, 4, 0, initialBytesToStrip));
+    public void addFrameDecoder(
+            SocketChannel ch, int maxFrameLength, int initialBytesToStrip, 
boolean preferHeap) {
+        LengthFieldBasedFrameDecoder frameDecoder =
+                new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 
initialBytesToStrip);
+        if (preferHeap) {
+            frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+        }
+        ch.pipeline().addLast("frameDecoder", frameDecoder);
     }
 
     public void addIdleStateHandler(SocketChannel ch) {
         ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, 
maxIdleTimeSeconds));
     }
+
+    /**
+     * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the 
cumulation buffer on the JVM
+     * heap to avoid off-heap memory pressure from large partial messages.
+     *
+     * <p>On native transports (epoll / kqueue), {@code 
EpollRecvByteAllocatorHandle.allocate()}
+     * always forces the read buffer to be direct regardless of the channel 
allocator. The default
+     * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as 
the cumulation. For
+     * large partial messages, the cumulation can grow up to the maximum frame 
size (e.g. 32 MB), so
+     * multiple connections could consume significant off-heap memory.
+     */
+    static final class HeapPreferringCumulator implements 
ByteToMessageDecoder.Cumulator {
+
+        /** Length-field size in bytes used by the frame protocol. */
+        private final int lengthFieldSize;
+
+        public HeapPreferringCumulator(int lengthFieldSize) {
+            this.lengthFieldSize = lengthFieldSize;
+        }
+
+        @Override
+        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, 
ByteBuf in) {
+            if (cumulation == in) {
+                in.release();
+                return cumulation;
+            }
+            if (!cumulation.isReadable() && in.isContiguous()) {
+                // Fast-path: cumulation is empty and the incoming buffer is 
contiguous.
+                // Allow adoption if the buffer is already heap, or if it is 
direct but
+                // contains at least one complete frame (the direct memory is 
short-lived
+                // because the frame decoder will extract and release it 
immediately).
+                if (!in.isDirect()) {
+                    cumulation.release();
+                    return in;
+                }
+                // Direct buffer with incomplete frame — allocate a heap 
cumulation
+                // pre-sized to the expected frame length to avoid repeated 
expansion.

Review Comment:
   The inline comment says we “allow adoption” of a direct buffer if it already 
contains a complete frame, but the code never implements that check and 
currently never adopts direct buffers (all direct inputs go through heap 
allocation). Please either implement the complete-frame fast path (if intended) 
or update the comment to match the actual behavior.
   ```suggestion
                   // Adopt the incoming buffer only when it is already 
heap-backed.
                   // Direct buffers are always copied into a heap cumulation 
so downstream
                   // decoding continues to operate on heap memory.
                   if (!in.isDirect()) {
                       cumulation.release();
                       return in;
                   }
                   // Direct buffer — allocate a heap cumulation pre-sized to 
the expected
                   // frame length to avoid repeated expansion while converting 
to heap.
   ```



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java:
##########
@@ -37,10 +37,13 @@ public interface RpcClient extends AutoCloseable {
      *
      * @param conf The configuration to use.
      * @param clientMetricGroup The client metric group
+     * @param isInnerClient Whether it is an inner client used for communicate 
from server to
+     *     server.
      * @return The RPC client.
      */
-    static RpcClient create(Configuration conf, ClientMetricGroup 
clientMetricGroup) {
-        return new NettyClient(conf, clientMetricGroup);
+    static RpcClient create(
+            Configuration conf, ClientMetricGroup clientMetricGroup, boolean 
isInnerClient) {
+        return new NettyClient(conf, clientMetricGroup, isInnerClient);
     }

Review Comment:
   This change removes the previous `RpcClient.create(conf, metricGroup)` 
factory and replaces it with a new signature that requires a boolean, which is 
an API-breaking change for external callers. Consider keeping the old overload 
(defaulting to the current “non-inner client” behavior), and optionally 
deprecate it to guide users to the new API.



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -931,6 +931,15 @@ public class ConfigOptions {
                             "The number of threads that the client uses for 
sending requests to the "
                                     + "network and receiving responses from 
network. The default value is 4");
 
+    public static final ConfigOption<Boolean> 
NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST =
+            key("netty.client.allocator.heap-buffer-first")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to allocate heap buffer first for the 
netty client. "
+                                    + "If set to false, direct buffer will be 
used first, "
+                                    + "which requires sufficient off-heap 
memory to be available.");
+

Review Comment:
   `NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST` is introduced here but is not 
referenced anywhere else in the codebase, so users cannot actually control the 
new heap-vs-direct behavior via configuration. Please either wire this option 
into `NettyClient`/`ClientChannelInitializer` (and document exactly what it 
affects—cumulation vs allocator) or remove the option until it is supported.
   ```suggestion
   
   ```



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java:
##########
@@ -89,19 +90,19 @@ public NettyClient(Configuration conf, ClientMetricGroup 
clientMetricGroup) {
         int connectTimeoutMs = (int) 
conf.get(ConfigOptions.CLIENT_CONNECT_TIMEOUT).toMillis();
         int connectionMaxIdle =
                 (int) 
conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
-        PooledByteBufAllocator pooledAllocator = 
PooledByteBufAllocator.DEFAULT;
+        PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
         this.bootstrap =
                 new Bootstrap()
                         .group(eventGroup)
                         
.channel(NettyUtils.getClientSocketChannelClass(eventGroup))
-                        .option(ChannelOption.ALLOCATOR, pooledAllocator)
+                        .option(ChannelOption.ALLOCATOR, allocator)
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
connectTimeoutMs)
                         .option(ChannelOption.TCP_NODELAY, true)
                         .option(ChannelOption.SO_KEEPALIVE, true)
-                        .handler(new 
ClientChannelInitializer(connectionMaxIdle));
+                        .handler(new 
ClientChannelInitializer(connectionMaxIdle, isInnerClient));
         this.clientMetricGroup = clientMetricGroup;
         this.authenticatorSupplier = 
AuthenticationFactory.loadClientAuthenticatorSupplier(conf);
-        NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
+        NettyMetrics.registerNettyMetrics(clientMetricGroup, allocator);
     }

Review Comment:
   `NettyClient` now requires an `isInnerClient` boolean at construction time, 
but the new config option `netty.client.allocator.heap-buffer-first` is not 
used here, so heap-first behavior is effectively hard-coded based on call site. 
Prefer driving heap-vs-direct selection from `Configuration` (with a sensible 
default) and only override for special internal/server use cases if needed.



##########
fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java:
##########
@@ -678,7 +678,8 @@ private RecordAccumulator createTestRecordAccumulator(
                         
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
                         GatewayClientProxy.createGatewayProxy(
                                 () -> cluster.getRandomTabletServer(),
-                                RpcClient.create(conf, 
TestingClientMetricGroup.newInstance()),
+                                RpcClient.create(
+                                        conf, 
TestingClientMetricGroup.newInstance(), false),
                                 TabletServerGateway.class),

Review Comment:
   A test (`DefaultCompletedFetchBufferLifecycleTest`) was removed in this PR, 
which reduces coverage for `DefaultCompletedFetch` ByteBuf reference-count 
lifecycle (leak/double-release regression protection). Unless that test is no 
longer valid, it should be kept or replaced with equivalent coverage; otherwise 
please document why the lifecycle guarantees are no longer needed.



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
         }
     }
 
-    public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int 
initialBytesToStrip) {
-        ch.pipeline()
-                .addLast(
-                        "frameDecoder",
-                        new LengthFieldBasedFrameDecoder(
-                                maxFrameLength, 0, 4, 0, initialBytesToStrip));
+    public void addFrameDecoder(
+            SocketChannel ch, int maxFrameLength, int initialBytesToStrip, 
boolean preferHeap) {
+        LengthFieldBasedFrameDecoder frameDecoder =
+                new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 
initialBytesToStrip);
+        if (preferHeap) {
+            frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+        }
+        ch.pipeline().addLast("frameDecoder", frameDecoder);
     }
 
     public void addIdleStateHandler(SocketChannel ch) {
         ch.pipeline().addLast("idle", new IdleStateHandler(0, 0, 
maxIdleTimeSeconds));
     }
+
+    /**
+     * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the 
cumulation buffer on the JVM
+     * heap to avoid off-heap memory pressure from large partial messages.
+     *
+     * <p>On native transports (epoll / kqueue), {@code 
EpollRecvByteAllocatorHandle.allocate()}
+     * always forces the read buffer to be direct regardless of the channel 
allocator. The default
+     * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as 
the cumulation. For
+     * large partial messages, the cumulation can grow up to the maximum frame 
size (e.g. 32 MB), so
+     * multiple connections could consume significant off-heap memory.
+     */
+    static final class HeapPreferringCumulator implements 
ByteToMessageDecoder.Cumulator {

Review Comment:
   The new `HeapPreferringCumulator` / `preferHeap` behavior is non-trivial and 
affects memory management and buffer lifetimes, but there are no dedicated 
tests validating the behavior (e.g., that cumulation becomes heap when enabled, 
and that buffers are released correctly). Please add focused unit tests around 
the cumulator and the `addFrameDecoder(..., preferHeap=true)` path to prevent 
regressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to