mmodzelewski commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2848996887
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ .remoteAddress(this.host, this.port);
+ }
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
+ // Adding Response Handler Now Statefull
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
+ }
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
}
/**
- * Connects to the server asynchronously.
+ * Returns Pool metrics.
*/
- public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public PoolMetrics getMetrics() {
+ return this.poolMetrics;
+ }
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
+ /**
+ * BroadCasts Command to each connection
+ * (Mainly for login so that each connection in the pool is Authenticated)
+ * Returns the result of the LAST connection's execution, allowing the
caller
+ * to treat this like a single request.
+ */
+ public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf
payload) {
+ try {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is closed"));
}
- });
+ if (channelPool == null) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Client not connected call
connect first"));
+ }
+ List<CompletableFuture<ByteBuf>> lastFuture = new ArrayList<>();
+
+ int poolSize = poolConfig.getMaxConnections();
- return future;
+ for (int i = 0; i < poolSize; i++) {
+ lastFuture.add(send(commandCode, payload.retainedDuplicate()));
+ }
+ for (int i = 0; i < poolSize - 1; i++) {
+ lastFuture.get(i).thenAccept(response -> response.release());
+ }
+ return lastFuture.get(lastFuture.size() - 1);
+ } finally {
+ payload.release();
+ }
}
/**
* Sends a command asynchronously and returns the response.
* Uses Netty's EventLoop to ensure thread-safe sequential request
processing with FIFO response matching.
*/
public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) {
- if (channel == null || !channel.isActive()) {
- payload.release();
+ if (isClosed.get()) {
Review Comment:
the payload should be released in the early returns
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -242,36 +334,94 @@ protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf msg) {
int status = msg.readIntLE();
int length = msg.readIntLE();
- // Since Iggy doesn't use request IDs, we process responses in
order
- // Get the oldest pending request
- if (!pendingRequests.isEmpty()) {
- Long oldestRequestId =
-
pendingRequests.keySet().stream().min(Long::compare).orElse(null);
+ CompletableFuture<ByteBuf> future = responseQueue.poll();
- if (oldestRequestId != null) {
- CompletableFuture<ByteBuf> future =
pendingRequests.remove(oldestRequestId);
+ if (future != null) {
- if (status == 0) {
- // Success - pass the remaining buffer as response
- future.complete(msg.retainedSlice());
- } else {
- // Error - the payload contains the error message
- byte[] errorBytes = length > 0 ? new byte[length] :
new byte[0];
- if (length > 0) {
- msg.readBytes(errorBytes);
- }
+ if (status == 0) {
+ // Success - pass the remaining buffer as response
+ future.complete(msg.retainedSlice());
+ } else {
+ // Error - the payload contains the error message
+ if (length > 0) {
+ byte[] errorBytes = new byte[length];
+ msg.readBytes(errorBytes);
future.completeExceptionally(IggyServerException.fromTcpResponse(status,
errorBytes));
+ } else {
+ future.completeExceptionally(new
IggyServerException(status));
}
}
+ } else {
+ log.error(
+ "Received response on channel {} but no request was
waiting!",
+ ctx.channel().id());
+ }
+ if (pool != null) {
+ pool.release(ctx.channel());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
- // Fail all pending requests
- pendingRequests.values().forEach(future ->
future.completeExceptionally(cause));
- pendingRequests.clear();
+ // If the connection dies, fail ALL waiting requests for this
connection
+ CompletableFuture<ByteBuf> f;
+ while ((f = responseQueue.poll()) != null) {
+ f.completeExceptionally(cause);
+ }
ctx.close();
}
}
+
+ // Inner Class for Channel pool configurations
+ public static class TCPConnectionPoolConfig {
+ private final int maxConnections;
+ private final int maxPendingAcquires;
+ private final long acquireTimeoutMillis;
+
+ public TCPConnectionPoolConfig(int maxConnections, int
maxPendingAcquires, long acquireTimeoutMillis) {
+ this.maxConnections = maxConnections;
+ this.maxPendingAcquires = maxPendingAcquires;
+ this.acquireTimeoutMillis = acquireTimeoutMillis;
+ }
+
+ public int getMaxConnections() {
+ return this.maxConnections;
+ }
+
+ public int getMaxPendingAcquires() {
+ return this.maxPendingAcquires;
+ }
+
+ public long getAcquireTimeoutMillis() {
+ return this.acquireTimeoutMillis;
+ }
+
+ // Builder Class for TCPConnectionPoolConfig
+ public static final class Builder {
+ private int maxConnections = 5;
+ private int maxPendingAcquires = 1000;
+ private long acquireTimeoutMillis = 5000;
Review Comment:
assign above values to constants, and use them in no-args constructor for
the config
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ .remoteAddress(this.host, this.port);
+ }
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
Review Comment:
in the SDK code, please only use exceptions that inherit from `IggyException`
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -207,33 +300,32 @@ public CompletableFuture<ByteBuf> send(int commandCode,
ByteBuf payload) {
* Closes the connection and releases resources.
*/
public CompletableFuture<Void> close() {
- CompletableFuture<Void> future = new CompletableFuture<>();
-
- if (channel != null && channel.isActive()) {
- channel.close().addListener((ChannelFutureListener) channelFuture
-> {
- eventLoopGroup.shutdownGracefully();
- if (channelFuture.isSuccess()) {
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
- }
- });
- } else {
- eventLoopGroup.shutdownGracefully();
- future.complete(null);
+ if (isClosed.compareAndSet(false, true)) {
+ if (channelPool != null) {
+ channelPool.close();
+ }
+ return
CompletableFuture.runAsync(eventLoopGroup::shutdownGracefully);
Review Comment:
It would make sense to wait for the result of `shutdownGracefully`.
Currently, it is ignored.
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java:
##########
@@ -158,7 +159,15 @@ public static AsyncIggyTcpClientBuilder builder() {
* @return a {@link CompletableFuture} that completes when the connection
is established
*/
public CompletableFuture<Void> connect() {
- connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate);
+ TCPConnectionPoolConfig.Builder poolConfigBuilder = new
TCPConnectionPoolConfig.Builder();
+ if (connectionPoolSize.isPresent()) {
+ poolConfigBuilder.setMaxConnections(connectionPoolSize.get());
+ }
+ if (connectionTimeout.isPresent()) {
+
poolConfigBuilder.setAcquireTimeoutMillis(connectionTimeout.get().toMillis());
Review Comment:
seems like connection timeout should map to `CONNECT_TIMEOUT_MILLIS`,
acquire timeout is semantically different thing
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java:
##########
@@ -158,7 +159,15 @@ public static AsyncIggyTcpClientBuilder builder() {
* @return a {@link CompletableFuture} that completes when the connection
is established
*/
public CompletableFuture<Void> connect() {
- connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate);
+ TCPConnectionPoolConfig.Builder poolConfigBuilder = new
TCPConnectionPoolConfig.Builder();
+ if (connectionPoolSize.isPresent()) {
Review Comment:
instead of `isPresent/get`, it would be more idiomatic to just call
`ifPresent` with an update lambda
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -242,36 +334,94 @@ protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf msg) {
int status = msg.readIntLE();
int length = msg.readIntLE();
- // Since Iggy doesn't use request IDs, we process responses in
order
- // Get the oldest pending request
- if (!pendingRequests.isEmpty()) {
- Long oldestRequestId =
-
pendingRequests.keySet().stream().min(Long::compare).orElse(null);
+ CompletableFuture<ByteBuf> future = responseQueue.poll();
- if (oldestRequestId != null) {
- CompletableFuture<ByteBuf> future =
pendingRequests.remove(oldestRequestId);
+ if (future != null) {
- if (status == 0) {
- // Success - pass the remaining buffer as response
- future.complete(msg.retainedSlice());
- } else {
- // Error - the payload contains the error message
- byte[] errorBytes = length > 0 ? new byte[length] :
new byte[0];
- if (length > 0) {
- msg.readBytes(errorBytes);
- }
+ if (status == 0) {
+ // Success - pass the remaining buffer as response
+ future.complete(msg.retainedSlice());
+ } else {
+ // Error - the payload contains the error message
+ if (length > 0) {
+ byte[] errorBytes = new byte[length];
+ msg.readBytes(errorBytes);
future.completeExceptionally(IggyServerException.fromTcpResponse(status,
errorBytes));
+ } else {
+ future.completeExceptionally(new
IggyServerException(status));
}
}
+ } else {
+ log.error(
+ "Received response on channel {} but no request was
waiting!",
+ ctx.channel().id());
+ }
+ if (pool != null) {
+ pool.release(ctx.channel());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
- // Fail all pending requests
- pendingRequests.values().forEach(future ->
future.completeExceptionally(cause));
- pendingRequests.clear();
+ // If the connection dies, fail ALL waiting requests for this
connection
+ CompletableFuture<ByteBuf> f;
+ while ((f = responseQueue.poll()) != null) {
+ f.completeExceptionally(cause);
+ }
ctx.close();
}
}
+
+ // Inner Class for Channel pool configurations
Review Comment:
Please drop all comments that do not introduce any new information. Comments
are justified in situations where code is not obvious, something is done in a
non-standard way, etc.
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PoolMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Metrics for monitoring connection pool performance.
+ * Tracks active connections, wait times, and errors.
+ */
+public class PoolMetrics {
Review Comment:
I'd drop the metrics for now. Let's make sure that the connection pool is
working fine first. This can be a follow up.
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ .remoteAddress(this.host, this.port);
+ }
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
+ // Adding Response Handler Now Statefull
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
+ }
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
}
/**
- * Connects to the server asynchronously.
+ * Returns Pool metrics.
*/
- public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public PoolMetrics getMetrics() {
+ return this.poolMetrics;
+ }
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
+ /**
+ * BroadCasts Command to each connection
+ * (Mainly for login so that each connection in the pool is Authenticated)
+ * Returns the result of the LAST connection's execution, allowing the
caller
+ * to treat this like a single request.
+ */
+ public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf
payload) {
+ try {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is closed"));
}
- });
+ if (channelPool == null) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Client not connected call
connect first"));
+ }
+ List<CompletableFuture<ByteBuf>> lastFuture = new ArrayList<>();
+
+ int poolSize = poolConfig.getMaxConnections();
- return future;
+ for (int i = 0; i < poolSize; i++) {
+ lastFuture.add(send(commandCode, payload.retainedDuplicate()));
+ }
+ for (int i = 0; i < poolSize - 1; i++) {
+ lastFuture.get(i).thenAccept(response -> response.release());
+ }
+ return lastFuture.get(lastFuture.size() - 1);
+ } finally {
+ payload.release();
+ }
}
/**
* Sends a command asynchronously and returns the response.
* Uses Netty's EventLoop to ensure thread-safe sequential request
processing with FIFO response matching.
*/
public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) {
- if (channel == null || !channel.isActive()) {
- payload.release();
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(
+ new IggyNotConnectedException("Connection not established
or closed"));
+ }
+ if (channelPool == null) {
return CompletableFuture.failedFuture(
new IggyNotConnectedException("Connection not established
or closed"));
}
+ // Starting the response clock
+ long starttime = System.nanoTime();
+
CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
- // Execute on channel's EventLoop to ensure sequential processing
- // This is necessary because Iggy protocol doesn't include request IDs,
- // and responses are matched using FIFO order
- channel.eventLoop().execute(() -> {
- // Since Iggy doesn't use request IDs, we'll just use a simple
queue
- // Each request will get the next response in order
- long requestId = requestIdGenerator.incrementAndGet();
- pendingRequests.put(requestId, responseFuture);
-
- // Build the request frame exactly like the blocking client
- // Frame format: [payload_size:4][command:4][payload:N]
- // where payload_size = 4 (command size) + N (payload size)
- int payloadSize = payload.readableBytes();
- int framePayloadSize = 4 + payloadSize; // command (4 bytes) +
payload
-
- ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
- frame.writeIntLE(framePayloadSize); // Length field (includes
command)
- frame.writeIntLE(commandCode); // Command
- frame.writeBytes(payload, payload.readerIndex(), payloadSize); //
Payload
-
- // Debug: print frame bytes
- if (log.isTraceEnabled()) {
- byte[] frameBytes = new byte[Math.min(frame.readableBytes(),
30)];
- frame.getBytes(0, frameBytes);
- StringBuilder hex = new StringBuilder();
- for (byte b : frameBytes) {
- hex.append(String.format("%02x ", b));
- }
- log.trace(
- "Sending frame with command: {}, payload size: {},
frame payload size (with command): {}, total frame size: {}",
- commandCode,
- payloadSize,
- framePayloadSize,
- frame.readableBytes());
- log.trace("Frame bytes (hex): {}", hex.toString());
+ channelPool.acquire().addListener((FutureListener<Channel>) f -> {
+
+ // Stoping the watch to record waitime
+ long waitTime = System.nanoTime() - starttime;
+ poolMetrics.recordWaitTime(waitTime);
+
+ if (!f.isSuccess()) {
+ poolMetrics.recordError();
+ responseFuture.completeExceptionally(f.cause());
+ return;
}
- payload.release();
+ // Connection Aquired
+ poolMetrics.incrementActive();
- // Send the frame
- channel.writeAndFlush(frame).addListener((ChannelFutureListener)
future -> {
- if (!future.isSuccess()) {
- log.error("Failed to send frame: {}",
future.cause().getMessage());
- pendingRequests.remove(requestId);
- responseFuture.completeExceptionally(future.cause());
- } else {
- log.trace("Frame sent successfully to {}",
channel.remoteAddress());
+ Channel channel = f.getNow();
+ try {
+ IggyResponseHandler handler =
channel.pipeline().get(IggyResponseHandler.class);
+
+ CompletableFuture<ByteBuf> trackedFuture =
responseFuture.whenComplete((res, ex) -> {
+ poolMetrics.decrementActive();
+ });
+ if (handler == null) {
+ throw new IllegalStateException("Channel missing
IggyResponseHandler");
}
- });
+
+ // Enqueuing request so handler knows who to call back;
+ handler.enqueueRequest(responseFuture);
+
+ // Build the request frame exactly like the blocking client
+ // Frame format: [payload_size:4][command:4][payload:N]
+ // where payload_size = 4 (command size) + N (payload size)
+ int payloadSize = payload.readableBytes();
+ int framePayloadSize = 4 + payloadSize; // command (4 bytes) +
payload
+
+ ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
+ frame.writeIntLE(framePayloadSize); // Length field (includes
command)
+ frame.writeIntLE(commandCode); // Command
+ frame.writeBytes(payload, payload.readerIndex(), payloadSize);
// Payload
+
+ // Debug: print frame bytes
+ byte[] frameBytes = new byte[Math.min(frame.readableBytes(),
30)];
Review Comment:
This array should be allocated within the if trace block, otherwise it's
allocated but never used.
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -61,21 +67,31 @@ public class AsyncTcpConnection {
private final SslContext sslContext;
private final EventLoopGroup eventLoopGroup;
private final Bootstrap bootstrap;
- private Channel channel;
- private final AtomicLong requestIdGenerator = new AtomicLong(0);
- private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>>
pendingRequests = new ConcurrentHashMap<>();
+ private SimpleChannelPool channelPool;
+ private final TCPConnectionPoolConfig poolConfig;
+ private final PoolMetrics poolMetrics;
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
public AsyncTcpConnection(String host, int port) {
- this(host, port, false, Optional.empty());
+ this(host, port, false, Optional.empty(), new
TCPConnectionPoolConfig(5, 1000, 1000));
Review Comment:
Please create a no-args constructor for the config, and use it here instead.
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -242,36 +334,94 @@ protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf msg) {
int status = msg.readIntLE();
int length = msg.readIntLE();
- // Since Iggy doesn't use request IDs, we process responses in
order
- // Get the oldest pending request
- if (!pendingRequests.isEmpty()) {
- Long oldestRequestId =
-
pendingRequests.keySet().stream().min(Long::compare).orElse(null);
+ CompletableFuture<ByteBuf> future = responseQueue.poll();
- if (oldestRequestId != null) {
- CompletableFuture<ByteBuf> future =
pendingRequests.remove(oldestRequestId);
+ if (future != null) {
- if (status == 0) {
- // Success - pass the remaining buffer as response
- future.complete(msg.retainedSlice());
- } else {
- // Error - the payload contains the error message
- byte[] errorBytes = length > 0 ? new byte[length] :
new byte[0];
- if (length > 0) {
- msg.readBytes(errorBytes);
- }
+ if (status == 0) {
+ // Success - pass the remaining buffer as response
+ future.complete(msg.retainedSlice());
+ } else {
+ // Error - the payload contains the error message
+ if (length > 0) {
+ byte[] errorBytes = new byte[length];
+ msg.readBytes(errorBytes);
future.completeExceptionally(IggyServerException.fromTcpResponse(status,
errorBytes));
+ } else {
+ future.completeExceptionally(new
IggyServerException(status));
}
}
+ } else {
+ log.error(
+ "Received response on channel {} but no request was
waiting!",
+ ctx.channel().id());
+ }
+ if (pool != null) {
+ pool.release(ctx.channel());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
- // Fail all pending requests
- pendingRequests.values().forEach(future ->
future.completeExceptionally(cause));
- pendingRequests.clear();
+ // If the connection dies, fail ALL waiting requests for this
connection
+ CompletableFuture<ByteBuf> f;
+ while ((f = responseQueue.poll()) != null) {
+ f.completeExceptionally(cause);
+ }
ctx.close();
}
}
+
+ // Inner Class for Channel pool configurations
+ public static class TCPConnectionPoolConfig {
+ private final int maxConnections;
+ private final int maxPendingAcquires;
+ private final long acquireTimeoutMillis;
+
+ public TCPConnectionPoolConfig(int maxConnections, int
maxPendingAcquires, long acquireTimeoutMillis) {
+ this.maxConnections = maxConnections;
+ this.maxPendingAcquires = maxPendingAcquires;
+ this.acquireTimeoutMillis = acquireTimeoutMillis;
+ }
+
+ public int getMaxConnections() {
+ return this.maxConnections;
+ }
+
+ public int getMaxPendingAcquires() {
+ return this.maxPendingAcquires;
+ }
+
+ public long getAcquireTimeoutMillis() {
+ return this.acquireTimeoutMillis;
+ }
+
+ // Builder Class for TCPConnectionPoolConfig
+ public static final class Builder {
+ private int maxConnections = 5;
+ private int maxPendingAcquires = 1000;
+ private long acquireTimeoutMillis = 5000;
+
+ public Builder() {}
+
+ public Builder setMaxConnections(int maxConnections) {
Review Comment:
let's at least add some simple validations for the values inserted to the
config
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ .remoteAddress(this.host, this.port);
+ }
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
+ // Adding Response Handler Now Statefull
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
+ }
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
}
/**
- * Connects to the server asynchronously.
+ * Returns Pool metrics.
*/
- public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public PoolMetrics getMetrics() {
+ return this.poolMetrics;
+ }
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
+ /**
+ * BroadCasts Command to each connection
+ * (Mainly for login so that each connection in the pool is Authenticated)
+ * Returns the result of the LAST connection's execution, allowing the
caller
+ * to treat this like a single request.
+ */
+ public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf
payload) {
Review Comment:
`broadcastAsync()` for authentication is unreliable - `FixedChannelPool`
creates channels lazily, so sending `poolSize` login requests may hit the same
channel multiple times (it gets released in `channelRead0` and re-acquired)
rather than reaching every distinct connection. Connections created later (e.g.
reconnect after health-check failure) will also be unauthenticated.
A simpler approach: authenticate lazily in `send()`. After acquiring a
channel, check a channel attribute (e.g. `channel.attr(AUTH_KEY)`). If not
authenticated, send login first, set the attribute on success, then send the
actual command. This works naturally with the async model, handles all channel
lifecycle events (pool growth, reconnection, idle eviction), and eliminates the
need for `broadcastAsync()` entirely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]