wuchong commented on code in PR #2803:
URL: https://github.com/apache/fluss/pull/2803#discussion_r3068196568
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/NettyChannelInitializer.java:
##########
@@ -46,15 +49,119 @@ protected void initChannel(SocketChannel ch) throws
Exception {
}
}
- public void addFrameDecoder(SocketChannel ch, int maxFrameLength, int
initialBytesToStrip) {
- ch.pipeline()
- .addLast(
- "frameDecoder",
- new LengthFieldBasedFrameDecoder(
- maxFrameLength, 0, 4, 0, initialBytesToStrip));
+ public void addFrameDecoder(
+ SocketChannel ch, int maxFrameLength, int initialBytesToStrip,
boolean preferHeap) {
+ LengthFieldBasedFrameDecoder frameDecoder =
+ new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0,
initialBytesToStrip);
+ if (preferHeap) {
+ frameDecoder.setCumulator(new HeapPreferringCumulator(4));
+ }
+ ch.pipeline().addLast("frameDecoder", frameDecoder);
}
public void addIdleStateHandler(SocketChannel ch) {
ch.pipeline().addLast("idle", new IdleStateHandler(0, 0,
maxIdleTimeSeconds));
}
+
+ /**
+ * A custom {@link ByteToMessageDecoder.Cumulator} that keeps the
cumulation buffer on the JVM
+ * heap to avoid off-heap memory pressure from large partial messages.
+ *
+ * <p>On native transports (epoll / kqueue), {@code
EpollRecvByteAllocatorHandle.allocate()}
+ * always forces the read buffer to be direct regardless of the channel
allocator. The default
+ * {@code MERGE_CUMULATOR}'s fast-path would adopt this direct buffer as
the cumulation. For
Review Comment:
`{@code MERGE_CUMULATOR}` -> `{@link ByteToMessageDecoder#MERGE_CUMULATOR}`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]