mmodzelewski commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2851693075


##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .option(ChannelOption.SO_KEEPALIVE, true)
-                .handler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    protected void initChannel(SocketChannel ch) {
-                        ChannelPipeline pipeline = ch.pipeline();
-
-                        if (enableTls) {
-                            pipeline.addLast("ssl", 
sslContext.newHandler(ch.alloc(), host, port));
-                        }
+                .remoteAddress(this.host, this.port);
+    }
 
-                        // Custom frame decoder for Iggy protocol responses
-                        pipeline.addLast("frameDecoder", new 
IggyFrameDecoder());
+    /**
+     * Initialises Connection pool.
+     */
+    public CompletableFuture<Void> connect() {
+        if (isClosed.get()) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException("Client is Closed"));
+        }
+        AbstractChannelPoolHandler poolHandler = new 
AbstractChannelPoolHandler() {
+            @Override
+            public void channelCreated(Channel ch) {
+                ChannelPipeline pipeline = ch.pipeline();
+                if (enableTls) {
+                    // adding ssl if ssl enabled
+                    pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), 
host, port));
+                }
+                // Adding the FrameDecoder to end of channel pipeline
+                pipeline.addLast("frameDecoder", new IggyFrameDecoder());
 
-                        // No encoder needed - we build complete frames 
following Iggy protocol
-                        // The protocol already includes the length field, so 
adding an encoder
-                        // would duplicate it. This matches the blocking 
client implementation.
+                // Adding Response Handler Now Statefull
+                pipeline.addLast("responseHandler", new IggyResponseHandler());
+            }
 
-                        // Response handler
-                        pipeline.addLast("responseHandler", new 
IggyResponseHandler(pendingRequests));
-                    }
-                });
+            @Override
+            public void channelAcquired(Channel ch) {
+                IggyResponseHandler handler = 
ch.pipeline().get(IggyResponseHandler.class);
+                handler.setPool(channelPool);
+            }
+        };
+
+        this.channelPool = new FixedChannelPool(
+                bootstrap,
+                poolHandler,
+                ChannelHealthChecker.ACTIVE, // Check If the connection is 
Active Before Lending
+                FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take 
too long
+                poolConfig.getAcquireTimeoutMillis(),
+                poolConfig.getMaxConnections(),
+                poolConfig.getMaxPendingAcquires());
+        log.info("Connection pool initialized with max connections: {}", 
poolConfig.getMaxConnections());
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
-     * Connects to the server asynchronously.
+     * Returns Pool metrics.
      */
-    public CompletableFuture<Void> connect() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    public PoolMetrics getMetrics() {
+        return this.poolMetrics;
+    }
 
-        bootstrap.connect(host, port).addListener((ChannelFutureListener) 
channelFuture -> {
-            if (channelFuture.isSuccess()) {
-                channel = channelFuture.channel();
-                future.complete(null);
-            } else {
-                future.completeExceptionally(channelFuture.cause());
+    /**
+     * BroadCasts Command to each connection
+     * (Mainly for login so that each connection in the pool is Authenticated)
+     * Returns the result of the LAST connection's execution, allowing the 
caller
+     * to treat this like a single request.
+     */
+    public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf 
payload) {

Review Comment:
   The user-facing API stays the same - `login()` is still called once. 
Internally, `login()` would store the credentials and authenticate the one 
channel it acquires. Then in `send()`, after acquiring a channel, you check a 
channel attribute (e.g. `AUTH_KEY`). If the channel isn't authenticated yet, 
send login with the stored credentials first, set the attribute, then send the 
actual command. This way, every channel gets authenticated transparently on 
first use, including channels created later by the pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to