This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d20f31a8 feat(java): implement Async Connection Pooling using 
FixedChannelPool (#2606)
7d20f31a8 is described below

commit 7d20f31a8091f0b17076bab5b544629b826fb79a
Author: Rythm Sachdeva <[email protected]>
AuthorDate: Tue Mar 10 01:05:05 2026 +0530

    feat(java): implement Async Connection Pooling using FixedChannelPool 
(#2606)
    
    Closes #2204
---
 .../iggy/client/async/tcp/AsyncIggyTcpClient.java  |  15 +-
 .../async/tcp/AsyncIggyTcpClientBuilder.java       |  18 +
 .../iggy/client/async/tcp/AsyncTcpConnection.java  | 408 +++++++++++++++------
 .../iggy/client/async/tcp/IggyAuthenticator.java   |  79 ++++
 .../iggy/client/async/tcp/IggyFrameEncoder.java    |  63 ++++
 5 files changed, 459 insertions(+), 124 deletions(-)

diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index e76493d1d..04e5e1d28 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -28,6 +28,7 @@ import org.apache.iggy.client.async.StreamsClient;
 import org.apache.iggy.client.async.SystemClient;
 import org.apache.iggy.client.async.TopicsClient;
 import org.apache.iggy.client.async.UsersClient;
+import 
org.apache.iggy.client.async.tcp.AsyncTcpConnection.TCPConnectionPoolConfig;
 import org.apache.iggy.config.RetryPolicy;
 import org.apache.iggy.exception.IggyMissingCredentialsException;
 import org.apache.iggy.exception.IggyNotConnectedException;
@@ -94,7 +95,7 @@ public class AsyncIggyTcpClient {
     private final int port;
     private final Optional<String> username;
     private final Optional<String> password;
-    private final Optional<Duration> connectionTimeout;
+    private final Optional<Duration> acquireTimeout;
     private final Optional<Duration> requestTimeout;
     private final Optional<Integer> connectionPoolSize;
     private final Optional<RetryPolicy> retryPolicy;
@@ -129,7 +130,7 @@ public class AsyncIggyTcpClient {
             int port,
             String username,
             String password,
-            Duration connectionTimeout,
+            Duration acquireTimeout,
             Duration requestTimeout,
             Integer connectionPoolSize,
             RetryPolicy retryPolicy,
@@ -139,7 +140,7 @@ public class AsyncIggyTcpClient {
         this.port = port;
         this.username = Optional.ofNullable(username);
         this.password = Optional.ofNullable(password);
-        this.connectionTimeout = Optional.ofNullable(connectionTimeout);
+        this.acquireTimeout = Optional.ofNullable(acquireTimeout);
         this.requestTimeout = Optional.ofNullable(requestTimeout);
         this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
         this.retryPolicy = Optional.ofNullable(retryPolicy);
@@ -166,7 +167,12 @@ public class AsyncIggyTcpClient {
      * @return a {@link CompletableFuture} that completes when the connection 
is established
      */
     public CompletableFuture<Void> connect() {
-        connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate);
+        TCPConnectionPoolConfig.Builder poolConfigBuilder = new 
TCPConnectionPoolConfig.Builder();
+        connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections);
+        acquireTimeout.ifPresent(timeout -> 
poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis()));
+        TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
+        // TCPConnectionPoolConfig poolConfig = new TCPConnectionPoolConfig();
+        connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate, poolConfig);
         return connection.connect().thenRun(() -> {
             messagesClient = new MessagesTcpClient(connection);
             consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
@@ -215,7 +221,6 @@ public class AsyncIggyTcpClient {
         }
         return usersClient;
     }
-
     /**
      * Returns the async messages client for producing and consuming messages.
      *
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
index 8d27e1d01..1db799094 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
@@ -73,6 +73,7 @@ public final class AsyncIggyTcpClientBuilder {
     private Duration requestTimeout;
     private Integer connectionPoolSize;
     private RetryPolicy retryPolicy;
+    private Duration acquireTimeout;
 
     AsyncIggyTcpClientBuilder() {}
 
@@ -155,6 +156,17 @@ public final class AsyncIggyTcpClientBuilder {
         return this;
     }
 
+    /**
+     * Sets channel acquire timeout
+     *
+     * @param acquireTimeout the acquire timeout duration
+     * @return this builder
+     */
+    public AsyncIggyTcpClientBuilder acquireTimeout(Duration acquireTimeout) {
+        this.acquireTimeout = acquireTimeout;
+        return this;
+    }
+
     /**
      * Sets the connection timeout.
      *
@@ -213,6 +225,12 @@ public final class AsyncIggyTcpClientBuilder {
         if (port == null || port <= 0) {
             throw new IggyInvalidArgumentException("Port must be a positive 
integer");
         }
+        if (connectionPoolSize != null && connectionPoolSize <= 0) {
+            throw new IggyInvalidArgumentException("Connection pool size 
cannot by 0 or negative");
+        }
+        if (acquireTimeout != null && (acquireTimeout.equals(Duration.ZERO) || 
acquireTimeout.isNegative())) {
+            throw new IggyInvalidArgumentException("AcquireTimeout Cannot be 0 
or Negative");
+        }
         return new AsyncIggyTcpClient(
                 host,
                 port,
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index a49ebc7f8..ca0a1f2f4 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -24,17 +24,22 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.pool.AbstractChannelPoolHandler;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.channel.pool.SimpleChannelPool;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.util.concurrent.FutureListener;
+import org.apache.iggy.exception.IggyClientException;
 import org.apache.iggy.exception.IggyEmptyResponseException;
+import org.apache.iggy.exception.IggyInvalidArgumentException;
 import org.apache.iggy.exception.IggyNotConnectedException;
 import org.apache.iggy.exception.IggyServerException;
 import org.apache.iggy.exception.IggyTlsException;
@@ -47,9 +52,10 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 /**
@@ -58,7 +64,6 @@ import java.util.function.Function;
  */
 public class AsyncTcpConnection {
     private static final Logger log = 
LoggerFactory.getLogger(AsyncTcpConnection.class);
-
     private final String host;
     private final int port;
     private final boolean enableTls;
@@ -66,21 +71,31 @@ public class AsyncTcpConnection {
     private final SslContext sslContext;
     private final EventLoopGroup eventLoopGroup;
     private final Bootstrap bootstrap;
-    private Channel channel;
-    private final AtomicLong requestIdGenerator = new AtomicLong(0);
-    private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>> 
pendingRequests = new ConcurrentHashMap<>();
+    private SimpleChannelPool channelPool;
+    private final TCPConnectionPoolConfig poolConfig;
+    private ByteBuf loginPayload;
+    private AtomicBoolean isAuthenticated = new AtomicBoolean(false);
+
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
     public AsyncTcpConnection(String host, int port) {
-        this(host, port, false, Optional.empty());
+        this(host, port, false, Optional.empty(), new 
TCPConnectionPoolConfig());
     }
 
-    public AsyncTcpConnection(String host, int port, boolean enableTls, 
Optional<File> tlsCertificate) {
+    public AsyncTcpConnection(
+            String host,
+            int port,
+            boolean enableTls,
+            Optional<File> tlsCertificate,
+            TCPConnectionPoolConfig poolConfig) {
         this.host = host;
         this.port = port;
         this.enableTls = enableTls;
         this.tlsCertificate = tlsCertificate;
+        this.poolConfig = poolConfig;
         this.eventLoopGroup = new NioEventLoopGroup();
         this.bootstrap = new Bootstrap();
+
         if (this.enableTls) {
             try {
                 SslContextBuilder builder = SslContextBuilder.forClient();
@@ -100,45 +115,50 @@ public class AsyncTcpConnection {
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .option(ChannelOption.SO_KEEPALIVE, true)
-                .handler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    protected void initChannel(SocketChannel ch) {
-                        ChannelPipeline pipeline = ch.pipeline();
-
-                        if (enableTls) {
-                            pipeline.addLast("ssl", 
sslContext.newHandler(ch.alloc(), host, port));
-                        }
-
-                        // Custom frame decoder for Iggy protocol responses
-                        pipeline.addLast("frameDecoder", new 
IggyFrameDecoder());
-
-                        // No encoder needed - we build complete frames 
following Iggy protocol
-                        // The protocol already includes the length field, so 
adding an encoder
-                        // would duplicate it. This matches the blocking 
client implementation.
-
-                        // Response handler
-                        pipeline.addLast("responseHandler", new 
IggyResponseHandler(pendingRequests));
-                    }
-                });
+                .remoteAddress(this.host, this.port);
     }
 
     /**
-     * Connects to the server asynchronously.
+     * Initialises Connection pool.
      */
     public CompletableFuture<Void> connect() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        if (isClosed.get()) {
+            return CompletableFuture.failedFuture(new 
IggyClientException("Client is Closed"));
+        }
+        AbstractChannelPoolHandler poolHandler = new 
AbstractChannelPoolHandler() {
+            @Override
+            public void channelCreated(Channel ch) {
+                ChannelPipeline pipeline = ch.pipeline();
+                if (enableTls) {
+                    // adding ssl if ssl enabled
+                    pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), 
host, port));
+                }
+                // Adding the FrameDecoder to end of channel pipeline
+                pipeline.addLast("frameDecoder", new IggyFrameDecoder());
 
-        bootstrap.connect(host, port).addListener((ChannelFutureListener) 
channelFuture -> {
-            if (channelFuture.isSuccess()) {
-                channel = channelFuture.channel();
-                future.complete(null);
-            } else {
-                future.completeExceptionally(channelFuture.cause());
+                // Adding Response Handler Now Stateful
+                pipeline.addLast("responseHandler", new IggyResponseHandler());
             }
-        });
 
-        return future;
+            @Override
+            public void channelAcquired(Channel ch) {
+                IggyResponseHandler handler = 
ch.pipeline().get(IggyResponseHandler.class);
+                handler.setPool(channelPool);
+            }
+        };
+
+        this.channelPool = new FixedChannelPool(
+                bootstrap,
+                poolHandler,
+                ChannelHealthChecker.ACTIVE, // Check If the connection is 
Active Before Lending
+                FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take 
too long
+                poolConfig.getAcquireTimeoutMillis(),
+                poolConfig.getMaxConnections(),
+                poolConfig.getMaxPendingAcquires());
+        log.info("Connection pool initialized with max connections: {}", 
poolConfig.getMaxConnections());
+        return CompletableFuture.completedFuture(null);
     }
 
     public <T> CompletableFuture<T> exchangeForEntity(
@@ -197,99 +217,166 @@ public class AsyncTcpConnection {
      * Uses Netty's EventLoop to ensure thread-safe sequential request 
processing with FIFO response matching.
      */
     public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) {
-        if (channel == null || !channel.isActive()) {
-            payload.release();
+        if (isClosed.get()) {
+            return CompletableFuture.failedFuture(
+                    new IggyNotConnectedException("Connection not established 
or closed"));
+        }
+        if (channelPool == null) {
             return CompletableFuture.failedFuture(
                     new IggyNotConnectedException("Connection not established 
or closed"));
         }
 
+        captureLoginPayloadIfNeeded(commandCode, payload);
         CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
 
-        // Execute on channel's EventLoop to ensure sequential processing
-        // This is necessary because Iggy protocol doesn't include request IDs,
-        // and responses are matched using FIFO order
-        channel.eventLoop().execute(() -> {
-            // Since Iggy doesn't use request IDs, we'll just use a simple 
queue
-            // Each request will get the next response in order
-            long requestId = requestIdGenerator.incrementAndGet();
-            pendingRequests.put(requestId, responseFuture);
-
-            // Build the request frame exactly like the blocking client
-            // Frame format: [payload_size:4][command:4][payload:N]
-            // where payload_size = 4 (command size) + N (payload size)
-            int payloadSize = payload.readableBytes();
-            int framePayloadSize = 4 + payloadSize; // command (4 bytes) + 
payload
-
-            ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
-            frame.writeIntLE(framePayloadSize); // Length field (includes 
command)
-            frame.writeIntLE(commandCode); // Command
-            frame.writeBytes(payload, payload.readerIndex(), payloadSize); // 
Payload
-
-            // Debug: print frame bytes
-            if (log.isTraceEnabled()) {
-                byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 
30)];
-                frame.getBytes(0, frameBytes);
-                StringBuilder hex = new StringBuilder();
-                for (byte b : frameBytes) {
-                    hex.append(String.format("%02x ", b));
+        channelPool.acquire().addListener((FutureListener<Channel>) f -> {
+            if (!f.isSuccess()) {
+                responseFuture.completeExceptionally(f.cause());
+                return;
+            }
+
+            Channel channel = f.getNow();
+            if (Boolean.FALSE.equals(isAuthenticated.get())) {
+                IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
+            }
+            CompletableFuture<Void> authStep;
+            boolean isLoginOp = (commandCode == 
CommandCode.User.LOGIN.getValue()
+                    || commandCode == 
CommandCode.PersonalAccessToken.LOGIN.getValue());
+
+            if (isLoginOp) {
+                authStep = CompletableFuture.completedFuture(null);
+            } else {
+                if (loginPayload == null) {
+                    responseFuture.completeExceptionally(new 
IggyNotConnectedException("Login First"));
                 }
-                log.trace(
-                        "Sending frame with command: {}, payload size: {}, 
frame payload size (with command): {}, total frame size: {}",
-                        commandCode,
-                        payloadSize,
-                        framePayloadSize,
-                        frame.readableBytes());
-                log.trace("Frame bytes (hex): {}", hex.toString());
+                authStep = IggyAuthenticator.ensureAuthenticated(
+                        channel, loginPayload.retainedDuplicate(), 
CommandCode.User.LOGIN.getValue());
             }
 
+            authStep.thenRun(() -> sendFrame(channel, payload, commandCode, 
responseFuture))
+                    .exceptionally(ex -> {
+                        payload.release();
+                        responseFuture.completeExceptionally(ex);
+                        return null;
+                    });
+
+            responseFuture.handle((res, ex) -> {
+                handlePostResponse(channel, commandCode, isLoginOp, ex);
+                return null;
+            });
+        });
+
+        return responseFuture;
+    }
+
+    private void sendFrame(
+            Channel channel, ByteBuf payload, int commandCode, 
CompletableFuture<ByteBuf> responseFuture) {
+        try {
+
+            IggyResponseHandler handler = 
channel.pipeline().get(IggyResponseHandler.class);
+            if (handler == null) {
+                throw new IggyClientException("Channel missing 
IggyResponseHandler");
+            }
+
+            // Enqueuing request so handler knows who to call back;
+            handler.enqueueRequest(responseFuture);
+
+            ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(), 
commandCode, payload);
+
             payload.release();
 
             // Send the frame
             channel.writeAndFlush(frame).addListener((ChannelFutureListener) 
future -> {
                 if (!future.isSuccess()) {
                     log.error("Failed to send frame: {}", 
future.cause().getMessage());
-                    pendingRequests.remove(requestId);
+                    frame.release();
+                    channel.close();
                     responseFuture.completeExceptionally(future.cause());
                 } else {
                     log.trace("Frame sent successfully to {}", 
channel.remoteAddress());
                 }
             });
-        });
 
-        return responseFuture;
+        } catch (RuntimeException e) {
+            responseFuture.completeExceptionally(e);
+        }
+    }
+
+    private void handlePostResponse(Channel channel, int commandCode, boolean 
isLoginOp, Throwable ex) {
+        if (isLoginOp) {
+            if (ex == null) {
+                isAuthenticated.set(true);
+            } else {
+                releaseLoginPayload();
+            }
+        }
+        if (commandCode == CommandCode.User.LOGOUT.getValue()) {
+            isAuthenticated.set(false);
+            IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
+        }
+        if (channelPool != null) {
+            channelPool.release(channel);
+        }
+    }
+
+    private void captureLoginPayloadIfNeeded(int commandCode, ByteBuf payload) 
{
+        if (commandCode == CommandCode.User.LOGIN.getValue() || commandCode == 
CommandCode.User.UPDATE.getValue()) {
+            updateLoginPayload(payload);
+        }
+    }
+
+    private synchronized void updateLoginPayload(ByteBuf payload) {
+        if (this.loginPayload != null) {
+            loginPayload.release();
+        }
+        this.loginPayload = payload.retainedSlice();
+    }
+
+    private synchronized void releaseLoginPayload() {
+        if (this.loginPayload != null) {
+            loginPayload.release();
+            this.loginPayload = null;
+        }
     }
 
     /**
      * Closes the connection and releases resources.
      */
     public CompletableFuture<Void> close() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-
-        if (channel != null && channel.isActive()) {
-            channel.close().addListener((ChannelFutureListener) channelFuture 
-> {
-                eventLoopGroup.shutdownGracefully();
-                if (channelFuture.isSuccess()) {
-                    future.complete(null);
+        if (isClosed.compareAndSet(false, true)) {
+            if (channelPool != null) {
+                channelPool.close();
+            }
+            CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+            eventLoopGroup.shutdownGracefully().addListener(f -> {
+                if (f.isSuccess()) {
+                    shutdownFuture.complete(null);
                 } else {
-                    future.completeExceptionally(channelFuture.cause());
+                    shutdownFuture.completeExceptionally(null);
                 }
             });
-        } else {
-            eventLoopGroup.shutdownGracefully();
-            future.complete(null);
+            return shutdownFuture;
         }
-
-        return future;
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
      * Response handler that correlates responses with requests.
      */
-    private static class IggyResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-        private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>> 
pendingRequests;
+    public static class IggyResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+        private final Queue<CompletableFuture<ByteBuf>> responseQueue = new 
ConcurrentLinkedQueue<>();
+        private SimpleChannelPool pool;
+
+        public IggyResponseHandler() {
+            this.pool = null;
+        }
+
+        public void setPool(SimpleChannelPool pool) {
+            this.pool = pool;
+        }
 
-        public IggyResponseHandler(ConcurrentHashMap<Long, 
CompletableFuture<ByteBuf>> pendingRequests) {
-            this.pendingRequests = pendingRequests;
+        public void enqueueRequest(CompletableFuture<ByteBuf> future) {
+            responseQueue.add(future);
         }
 
         @Override
@@ -298,36 +385,119 @@ public class AsyncTcpConnection {
             int status = msg.readIntLE();
             int length = msg.readIntLE();
 
-            // Since Iggy doesn't use request IDs, we process responses in 
order
-            // Get the oldest pending request
-            if (!pendingRequests.isEmpty()) {
-                Long oldestRequestId =
-                        
pendingRequests.keySet().stream().min(Long::compare).orElse(null);
-
-                if (oldestRequestId != null) {
-                    CompletableFuture<ByteBuf> future = 
pendingRequests.remove(oldestRequestId);
-
-                    if (status == 0) {
-                        // Success - pass the remaining buffer as response
-                        future.complete(msg.retainedSlice());
-                    } else {
-                        // Error - the payload contains the error message
-                        byte[] errorBytes = length > 0 ? new byte[length] : 
new byte[0];
-                        if (length > 0) {
-                            msg.readBytes(errorBytes);
-                        }
-                        
future.completeExceptionally(IggyServerException.fromTcpResponse(status, 
errorBytes));
-                    }
+            CompletableFuture<ByteBuf> future = responseQueue.poll();
+
+            if (future != null) {
+
+                if (status == 0) {
+                    // Success - pass the remaining buffer as response
+                    future.complete(msg.retainedSlice());
+                } else {
+                    // Error - the payload contains the error message
+
+                    byte[] errorBytes = length > 0 ? new byte[length] : new 
byte[0];
+                    msg.readBytes(errorBytes);
+                    
future.completeExceptionally(IggyServerException.fromTcpResponse(status, 
errorBytes));
                 }
+            } else {
+                log.error(
+                        "Received response on channel {} but no request was 
waiting!",
+                        ctx.channel().id());
             }
         }
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
-            // Fail all pending requests
-            pendingRequests.values().forEach(future -> 
future.completeExceptionally(cause));
-            pendingRequests.clear();
+            // If the connection dies, fail ALL waiting requests for this 
connection
+            CompletableFuture<ByteBuf> f;
+            while ((f = responseQueue.poll()) != null) {
+                f.completeExceptionally(cause);
+            }
+            if (pool != null) {
+                pool.release(ctx.channel());
+            }
             ctx.close();
         }
     }
+
+    public static class TCPConnectionPoolConfig {
+        private final int maxConnections;
+        private final int maxPendingAcquires;
+        private final long acquireTimeoutMillis;
+
+        public TCPConnectionPoolConfig() {
+            this(
+                    Builder.DEFAULT_MAX_CONNECTION,
+                    Builder.DEFAULT_MAX_PENDING_ACQUIRES,
+                    Builder.DEFAULT_ACQUIRE_TIMEOUT_MILLIS);
+        }
+
+        public TCPConnectionPoolConfig(int maxConnections, int 
maxPendingAcquires, long acquireTimeoutMillis) {
+            this.maxConnections = maxConnections;
+            this.maxPendingAcquires = maxPendingAcquires;
+            this.acquireTimeoutMillis = acquireTimeoutMillis;
+        }
+
+        public int getMaxConnections() {
+            return this.maxConnections;
+        }
+
+        public int getMaxPendingAcquires() {
+            return this.maxPendingAcquires;
+        }
+
+        public long getAcquireTimeoutMillis() {
+            return this.acquireTimeoutMillis;
+        }
+
+        // Builder Class for TCPConnectionPoolConfig
+        public static final class Builder {
+            public static final int DEFAULT_MAX_CONNECTION = 5;
+            public static final int DEFAULT_MAX_PENDING_ACQUIRES = 1000;
+            public static final int DEFAULT_ACQUIRE_TIMEOUT_MILLIS = 3000;
+
+            private int maxConnections;
+            private int maxPendingAcquires;
+            private long acquireTimeoutMillis;
+
+            public Builder() {}
+
+            public Builder setMaxConnections(int maxConnections) {
+                if (maxConnections <= 0) {
+                    throw new IggyInvalidArgumentException("Connection pool 
size cannot be 0 or negative");
+                }
+                this.maxConnections = maxConnections;
+                return this;
+            }
+
+            public Builder setMaxPendingAcquires(int maxPendingAcquires) {
+                if (maxPendingAcquires <= 0) {
+                    throw new IggyInvalidArgumentException("Max Pending 
Acquires cannot be 0 or negative");
+                }
+                this.maxPendingAcquires = maxPendingAcquires;
+                return this;
+            }
+
+            public Builder setAcquireTimeoutMillis(long acquireTimeoutMillis) {
+                if (acquireTimeoutMillis <= 0) {
+                    throw new IggyInvalidArgumentException("Acquire timeout 
cannot be 0 or negative");
+                }
+                this.acquireTimeoutMillis = acquireTimeoutMillis;
+                return this;
+            }
+
+            public TCPConnectionPoolConfig build() {
+                if (this.maxConnections == 0) {
+                    this.maxConnections = DEFAULT_MAX_CONNECTION;
+                }
+                if (this.acquireTimeoutMillis == 0) {
+                    this.acquireTimeoutMillis = DEFAULT_ACQUIRE_TIMEOUT_MILLIS;
+                }
+                if (this.maxPendingAcquires == 0) {
+                    this.maxPendingAcquires = DEFAULT_MAX_PENDING_ACQUIRES;
+                }
+                return new TCPConnectionPoolConfig(maxConnections, 
maxPendingAcquires, acquireTimeoutMillis);
+            }
+        }
+    }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
new file mode 100644
index 000000000..b11f17ba5
--- /dev/null
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.AttributeKey;
+import org.apache.iggy.client.async.tcp.AsyncTcpConnection.IggyResponseHandler;
+import org.apache.iggy.exception.IggyAuthenticationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class IggyAuthenticator {
+    private static final AttributeKey<Boolean> AUTH_KEY = 
AttributeKey.valueOf("AUTH_KEY");
+    private static final Logger log = 
LoggerFactory.getLogger(IggyAuthenticator.class);
+
+    private IggyAuthenticator() {}
+
+    public static CompletableFuture<Void> ensureAuthenticated(Channel channel, 
ByteBuf loginPayload, int commandCode) {
+        Boolean isAuth = channel.attr(AUTH_KEY).get();
+        if (Boolean.TRUE.equals(isAuth)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (loginPayload.equals(null)) {
+            return CompletableFuture.failedFuture(
+                    new IggyAuthenticationException(null, commandCode, "login 
first", null, null));
+        }
+
+        CompletableFuture<ByteBuf> loginFuture = new CompletableFuture<>();
+        IggyResponseHandler handler = 
channel.pipeline().get(IggyResponseHandler.class);
+        handler.enqueueRequest(loginFuture);
+        ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(), commandCode, 
loginPayload);
+        loginPayload.release();
+        channel.writeAndFlush(frame).addListener((ChannelFutureListener) f -> {
+            if (!f.isSuccess()) {
+                frame.release();
+                loginFuture.completeExceptionally(f.cause());
+            }
+        });
+
+        return loginFuture.thenAccept(result -> {
+            try {
+                channel.attr(AUTH_KEY).set(true);
+                log.debug("Channel {} authenticated successfully", 
channel.id());
+            } finally {
+                result.release();
+            }
+        });
+    }
+
+    public static void setAuthAttribute(Channel channel, AtomicBoolean value) {
+        channel.attr(AUTH_KEY).set(value.get());
+    }
+
+    public static Boolean getAuthAttribute(Channel channel) {
+        return channel.attr(AUTH_KEY).get();
+    }
+}
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
new file mode 100644
index 000000000..b26b31743
--- /dev/null
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class IggyFrameEncoder {
+    private static final Logger log = 
LoggerFactory.getLogger(IggyFrameEncoder.class);
+
+    private IggyFrameEncoder() {}
+
+    public static ByteBuf encode(ByteBufAllocator alloc, int commandCode, 
ByteBuf payload) {
+
+        // Build the request frame exactly like the blocking client
+        // Frame format: [payload_size:4][command:4][payload:N]
+        // where payload_size = 4 (command size) + N (payload size)
+        int payloadSize = payload.readableBytes();
+        int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload
+        ByteBuf frame = alloc.buffer(4 + framePayloadSize);
+        frame.writeIntLE(framePayloadSize); // Length field (includes command)
+        frame.writeIntLE(commandCode); // Command
+        frame.writeBytes(payload, payload.readerIndex(), payloadSize); // 
Payload
+
+        // Debug: print frame bytes
+        byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
+        if (log.isTraceEnabled()) {
+            frame.getBytes(0, frameBytes);
+            StringBuilder hex = new StringBuilder();
+            for (byte b : frameBytes) {
+                hex.append(String.format("%02x ", b));
+            }
+            log.trace(
+                    "Sending frame with command: {}, payload size: {}, frame 
payload size (with command): {}, total frame size: {}",
+                    commandCode,
+                    payloadSize,
+                    framePayloadSize,
+                    frame.readableBytes());
+            log.trace("Frame bytes (hex): {}", hex.toString());
+        }
+
+        return frame;
+    }
+}


Reply via email to