Copilot commented on code in PR #2803:
URL: https://github.com/apache/fluss/pull/2803#discussion_r3068181611
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws
Exception {
}
}
- public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int
initialBytesToStrip) {
- ch.pipeline()
- .addLast(
- "frameDecoder",
- new LengthFieldBasedFrameDecoder(
- maxFrameLength, 0, 4, 0, initialBytesToStrip));
+ public void addFrameDecoder(
+ SocketChannel ch, int maxFrameLength, int initialBytesToStrip,
boolean preferHeap) {
+ LengthFieldBasedFrameDecoder frameDecoder =
+ new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0,
initialBytesToStrip);
+ if (preferHeap) {
+ frameDecoder.setCumulator(new HeapPreferringCumulator(4));
Review Comment:
`LengthFieldBasedFrameDecoder#setCumulator(...)` is not publicly accessible
in Netty 4.1.104.Final (it’s a protected method on `ByteToMessageDecoder`), so
calling it here is likely to fail compilation. To keep this configurable, wrap
the frame decoder in a small subclass/anonymous subclass and call
`setCumulator(...)` from inside that subclass (or provide a custom
`LengthFieldBasedFrameDecoder` implementation that sets the cumulator in its
constructor).
```suggestion
LengthFieldBasedFrameDecoder frameDecoder;
if (preferHeap) {
frameDecoder =
new LengthFieldBasedFrameDecoder(
maxFrameLength, 0, 4, 0, initialBytesToStrip) {
{
setCumulator(new HeapPreferringCumulator(4));
}
};
} else {
frameDecoder =
new LengthFieldBasedFrameDecoder(
maxFrameLength, 0, 4, 0, initialBytesToStrip);
```
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws
Exception {
}
}
- public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int
initialBytesToStrip) {
- ch.pipeline()
- .addLast(
- "frameDecoder",
- new LengthFieldBasedFrameDecoder(
- maxFrameLength, 0, 4, 0, initialBytesToStrip));
+ public void addFrameDecoder(
+ SocketChannel ch, int maxFrameLength, int initialBytesToStrip,
boolean preferHeap) {
+ LengthFieldBasedFrameDecoder frameDecoder =
+ new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0,
initialBytesToStrip);
+ if (preferHeap) {
+ frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+ }
+ ch.pipeline().addLast("frameDecoder", frameDecoder);
}
public void addIdleStateHandler(SocketChannel ch) {
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0,
maxIdleTimeSeconds));
}
+
+ /**
+ * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the
cumulation buffer on the JVM
+ * heap to avoid off-heap memory pressure from large partial messages.
+ *
+ * <p>On native transports (epoll / kqueue), {@code
EpollRecvByteAllocatorHandle.allocate()}
+ * always forces the read buffer to be direct regardless of the channel
allocator. The default
+ * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as
the cumulation. For
+ * large partial messages, the cumulation can grow up to the maximum frame
size (e.g. 32 MB), so
+ * multiple connections could consume significant off-heap memory.
+ */
+ static final class HeapPreferringCumulator implements
ByteToMessageDecoder.Cumulator {
+
+ /** Length-field size in bytes used by the frame protocol. */
+ private final int lengthFieldSize;
+
+ public HeapPreferringCumulator(int lengthFieldSize) {
+ this.lengthFieldSize = lengthFieldSize;
+ }
+
+ @Override
+ public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation,
ByteBuf in) {
+ if (cumulation == in) {
+ in.release();
+ return cumulation;
+ }
+ if (!cumulation.isReadable() && in.isContiguous()) {
+ // Fast-path: cumulation is empty and the incoming buffer is
contiguous.
+ // Allow adoption if the buffer is already heap, or if it is
direct but
+ // contains at least one complete frame (the direct memory is
short-lived
+ // because the frame decoder will extract and release it
immediately).
+ if (!in.isDirect()) {
+ cumulation.release();
+ return in;
+ }
+ // Direct buffer with incomplete frame — allocate a heap
cumulation
+ // pre-sized to the expected frame length to avoid repeated
expansion.
Review Comment:
The inline comment says we “allow adoption” of a direct buffer if it already
contains a complete frame, but the code never implements that check and
currently never adopts direct buffers (all direct inputs go through heap
allocation). Please either implement the complete-frame fast path (if intended)
or update the comment to match the actual behavior.
```suggestion
// Adopt the incoming buffer only when it is already
heap-backed.
// Direct buffers are always copied into a heap cumulation
so downstream
// decoding continues to operate on heap memory.
if (!in.isDirect()) {
cumulation.release();
return in;
}
// Direct buffer — allocate a heap cumulation pre-sized to
the expected
// frame length to avoid repeated expansion while converting
to heap.
```
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java:
##########
@@ -37,10 +37,13 @@ public interface RpcClient extends AutoCloseable {
*
* @param conf The configuration to use.
* @param clientMetricGroup The client metric group
+ * @param isInnerClient Whether it is an inner client used for communicate
from server to
+ * server.
* @return The RPC client.
*/
- static RpcClient create(Configuration conf, ClientMetricGroup
clientMetricGroup) {
- return new NettyClient(conf, clientMetricGroup);
+ static RpcClient create(
+ Configuration conf, ClientMetricGroup clientMetricGroup, boolean
isInnerClient) {
+ return new NettyClient(conf, clientMetricGroup, isInnerClient);
}
Review Comment:
This change removes the previous `RpcClient.create(conf, metricGroup)`
factory and replaces it with a new signature that requires a boolean, which is
an API-breaking change for external callers. Consider keeping the old overload
(defaulting to the current “non-inner client” behavior), and optionally
deprecate it to guide users to the new API.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -931,6 +931,15 @@ public class ConfigOptions {
"The number of threads that the client uses for
sending requests to the "
+ "network and receiving responses from
network. The default value is 4");
+ public static final ConfigOption<Boolean>
NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST =
+ key("netty.client.allocator.heap-buffer-first")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to allocate heap buffer first for the
netty client. "
+ + "If set to false, direct buffer will be
used first, "
+ + "which requires sufficient off-heap
memory to be available.");
+
Review Comment:
`NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST` is introduced here but is not
referenced anywhere else in the codebase, so users cannot actually control the
new heap-vs-direct behavior via configuration. Please either wire this option
into `NettyClient`/`ClientChannelInitializer` (and document exactly what it
affects—cumulation vs allocator) or remove the option until it is supported.
```suggestion
```
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java:
##########
@@ -89,19 +90,19 @@ public NettyClient(Configuration conf, ClientMetricGroup
clientMetricGroup) {
int connectTimeoutMs = (int)
conf.get(ConfigOptions.CLIENT_CONNECT_TIMEOUT).toMillis();
int connectionMaxIdle =
(int)
conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds();
- PooledByteBufAllocator pooledAllocator =
PooledByteBufAllocator.DEFAULT;
+ PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
this.bootstrap =
new Bootstrap()
.group(eventGroup)
.channel(NettyUtils.getClientSocketChannelClass(eventGroup))
- .option(ChannelOption.ALLOCATOR, pooledAllocator)
+ .option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
connectTimeoutMs)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new
ClientChannelInitializer(connectionMaxIdle));
+ .handler(new
ClientChannelInitializer(connectionMaxIdle, isInnerClient));
this.clientMetricGroup = clientMetricGroup;
this.authenticatorSupplier =
AuthenticationFactory.loadClientAuthenticatorSupplier(conf);
- NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
+ NettyMetrics.registerNettyMetrics(clientMetricGroup, allocator);
}
Review Comment:
`NettyClient` now requires an `isInnerClient` boolean at construction time,
but the new config option `netty.client.allocator.heap-buffer-first` is not
used here, so heap-first behavior is effectively hard-coded based on call site.
Prefer driving heap-vs-direct selection from `Configuration` (with a sensible
default) and only override for special internal/server use cases if needed.
##########
fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java:
##########
@@ -678,7 +678,8 @@ private RecordAccumulator createTestRecordAccumulator(
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
GatewayClientProxy.createGatewayProxy(
() -> cluster.getRandomTabletServer(),
- RpcClient.create(conf,
TestingClientMetricGroup.newInstance()),
+ RpcClient.create(
+ conf,
TestingClientMetricGroup.newInstance(), false),
TabletServerGateway.class),
Review Comment:
A test (`DefaultCompletedFetchBufferLifecycleTest`) was removed in this PR,
which reduces coverage for `DefaultCompletedFetch` ByteBuf reference-count
lifecycle (leak/double-release regression protection). Unless that test is no
longer valid, it should be kept or replaced with equivalent coverage; otherwise
please document why the lifecycle guarantees are no longer needed.
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws
Exception {
}
}
- public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int
initialBytesToStrip) {
- ch.pipeline()
- .addLast(
- "frameDecoder",
- new LengthFieldBasedFrameDecoder(
- maxFrameLength, 0, 4, 0, initialBytesToStrip));
+ public void addFrameDecoder(
+ SocketChannel ch, int maxFrameLength, int initialBytesToStrip,
boolean preferHeap) {
+ LengthFieldBasedFrameDecoder frameDecoder =
+ new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0,
initialBytesToStrip);
+ if (preferHeap) {
+ frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+ }
+ ch.pipeline().addLast("frameDecoder", frameDecoder);
}
public void addIdleStateHandler(SocketChannel ch) {
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0,
maxIdleTimeSeconds));
}
+
+ /**
+ * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the
cumulation buffer on the JVM
+ * heap to avoid off-heap memory pressure from large partial messages.
+ *
+ * <p>On native transports (epoll / kqueue), {@code
EpollRecvByteAllocatorHandle.allocate()}
+ * always forces the read buffer to be direct regardless of the channel
allocator. The default
+ * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as
the cumulation. For
+ * large partial messages, the cumulation can grow up to the maximum frame
size (e.g. 32 MB), so
+ * multiple connections could consume significant off-heap memory.
+ */
+ static final class HeapPreferringCumulator implements
ByteToMessageDecoder.Cumulator {
Review Comment:
The new `HeapPreferringCumulator` / `preferHeap` behavior is non-trivial and
affects memory management and buffer lifetimes, but there are no dedicated
tests validating the behavior (e.g., that cumulation becomes heap when enabled,
and that buffers are released correctly). Please add focused unit tests around
the cumulator and the `addFrameDecoder(..., preferHeap=true)` path to prevent
regressions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]