rythm-sachdeva commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2850332429
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ .remoteAddress(this.host, this.port);
+ }
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
+ // Adding Response Handler Now Statefull
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
+ }
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
}
/**
- * Connects to the server asynchronously.
+ * Returns Pool metrics.
*/
- public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public PoolMetrics getMetrics() {
+ return this.poolMetrics;
+ }
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
+ /**
+ * BroadCasts Command to each connection
+ * (Mainly for login so that each connection in the pool is Authenticated)
+ * Returns the result of the LAST connection's execution, allowing the
caller
+ * to treat this like a single request.
+ */
+ public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf
payload) {
Review Comment:
Currently the login is called by the user once After client creation. In
this case how would the flow work ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]