This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 7d20f31a8 feat(java): implement Async Connection Pooling using
FixedChannelPool (#2606)
7d20f31a8 is described below
commit 7d20f31a8091f0b17076bab5b544629b826fb79a
Author: Rythm Sachdeva <[email protected]>
AuthorDate: Tue Mar 10 01:05:05 2026 +0530
feat(java): implement Async Connection Pooling using FixedChannelPool
(#2606)
Closes #2204
---
.../iggy/client/async/tcp/AsyncIggyTcpClient.java | 15 +-
.../async/tcp/AsyncIggyTcpClientBuilder.java | 18 +
.../iggy/client/async/tcp/AsyncTcpConnection.java | 408 +++++++++++++++------
.../iggy/client/async/tcp/IggyAuthenticator.java | 79 ++++
.../iggy/client/async/tcp/IggyFrameEncoder.java | 63 ++++
5 files changed, 459 insertions(+), 124 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index e76493d1d..04e5e1d28 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -28,6 +28,7 @@ import org.apache.iggy.client.async.StreamsClient;
import org.apache.iggy.client.async.SystemClient;
import org.apache.iggy.client.async.TopicsClient;
import org.apache.iggy.client.async.UsersClient;
+import
org.apache.iggy.client.async.tcp.AsyncTcpConnection.TCPConnectionPoolConfig;
import org.apache.iggy.config.RetryPolicy;
import org.apache.iggy.exception.IggyMissingCredentialsException;
import org.apache.iggy.exception.IggyNotConnectedException;
@@ -94,7 +95,7 @@ public class AsyncIggyTcpClient {
private final int port;
private final Optional<String> username;
private final Optional<String> password;
- private final Optional<Duration> connectionTimeout;
+ private final Optional<Duration> acquireTimeout;
private final Optional<Duration> requestTimeout;
private final Optional<Integer> connectionPoolSize;
private final Optional<RetryPolicy> retryPolicy;
@@ -129,7 +130,7 @@ public class AsyncIggyTcpClient {
int port,
String username,
String password,
- Duration connectionTimeout,
+ Duration acquireTimeout,
Duration requestTimeout,
Integer connectionPoolSize,
RetryPolicy retryPolicy,
@@ -139,7 +140,7 @@ public class AsyncIggyTcpClient {
this.port = port;
this.username = Optional.ofNullable(username);
this.password = Optional.ofNullable(password);
- this.connectionTimeout = Optional.ofNullable(connectionTimeout);
+ this.acquireTimeout = Optional.ofNullable(acquireTimeout);
this.requestTimeout = Optional.ofNullable(requestTimeout);
this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
this.retryPolicy = Optional.ofNullable(retryPolicy);
@@ -166,7 +167,12 @@ public class AsyncIggyTcpClient {
* @return a {@link CompletableFuture} that completes when the connection
is established
*/
public CompletableFuture<Void> connect() {
- connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate);
+ TCPConnectionPoolConfig.Builder poolConfigBuilder = new
TCPConnectionPoolConfig.Builder();
+ connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections);
+ acquireTimeout.ifPresent(timeout ->
poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis()));
+ TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
+ // TCPConnectionPoolConfig poolConfig = new TCPConnectionPoolConfig();
+ connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate, poolConfig);
return connection.connect().thenRun(() -> {
messagesClient = new MessagesTcpClient(connection);
consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
@@ -215,7 +221,6 @@ public class AsyncIggyTcpClient {
}
return usersClient;
}
-
/**
* Returns the async messages client for producing and consuming messages.
*
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
index 8d27e1d01..1db799094 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
@@ -73,6 +73,7 @@ public final class AsyncIggyTcpClientBuilder {
private Duration requestTimeout;
private Integer connectionPoolSize;
private RetryPolicy retryPolicy;
+ private Duration acquireTimeout;
AsyncIggyTcpClientBuilder() {}
@@ -155,6 +156,17 @@ public final class AsyncIggyTcpClientBuilder {
return this;
}
+ /**
+ * Sets channel acquire timeout
+ *
+ * @param acquireTimeout the acquire timeout duration
+ * @return this builder
+ */
+ public AsyncIggyTcpClientBuilder acquireTimeout(Duration acquireTimeout) {
+ this.acquireTimeout = acquireTimeout;
+ return this;
+ }
+
/**
* Sets the connection timeout.
*
@@ -213,6 +225,12 @@ public final class AsyncIggyTcpClientBuilder {
if (port == null || port <= 0) {
throw new IggyInvalidArgumentException("Port must be a positive
integer");
}
+ if (connectionPoolSize != null && connectionPoolSize <= 0) {
+ throw new IggyInvalidArgumentException("Connection pool size
cannot by 0 or negative");
+ }
+ if (acquireTimeout != null && (acquireTimeout.equals(Duration.ZERO) ||
acquireTimeout.isNegative())) {
+ throw new IggyInvalidArgumentException("AcquireTimeout Cannot be 0
or Negative");
+ }
return new AsyncIggyTcpClient(
host,
port,
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index a49ebc7f8..ca0a1f2f4 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -24,17 +24,22 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.pool.AbstractChannelPoolHandler;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.util.concurrent.FutureListener;
+import org.apache.iggy.exception.IggyClientException;
import org.apache.iggy.exception.IggyEmptyResponseException;
+import org.apache.iggy.exception.IggyInvalidArgumentException;
import org.apache.iggy.exception.IggyNotConnectedException;
import org.apache.iggy.exception.IggyServerException;
import org.apache.iggy.exception.IggyTlsException;
@@ -47,9 +52,10 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
@@ -58,7 +64,6 @@ import java.util.function.Function;
*/
public class AsyncTcpConnection {
private static final Logger log =
LoggerFactory.getLogger(AsyncTcpConnection.class);
-
private final String host;
private final int port;
private final boolean enableTls;
@@ -66,21 +71,31 @@ public class AsyncTcpConnection {
private final SslContext sslContext;
private final EventLoopGroup eventLoopGroup;
private final Bootstrap bootstrap;
- private Channel channel;
- private final AtomicLong requestIdGenerator = new AtomicLong(0);
- private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>>
pendingRequests = new ConcurrentHashMap<>();
+ private SimpleChannelPool channelPool;
+ private final TCPConnectionPoolConfig poolConfig;
+ private ByteBuf loginPayload;
+ private AtomicBoolean isAuthenticated = new AtomicBoolean(false);
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
public AsyncTcpConnection(String host, int port) {
- this(host, port, false, Optional.empty());
+ this(host, port, false, Optional.empty(), new
TCPConnectionPoolConfig());
}
- public AsyncTcpConnection(String host, int port, boolean enableTls,
Optional<File> tlsCertificate) {
+ public AsyncTcpConnection(
+ String host,
+ int port,
+ boolean enableTls,
+ Optional<File> tlsCertificate,
+ TCPConnectionPoolConfig poolConfig) {
this.host = host;
this.port = port;
this.enableTls = enableTls;
this.tlsCertificate = tlsCertificate;
+ this.poolConfig = poolConfig;
this.eventLoopGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
+
if (this.enableTls) {
try {
SslContextBuilder builder = SslContextBuilder.forClient();
@@ -100,45 +115,50 @@ public class AsyncTcpConnection {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
-
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
-
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
-
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ .remoteAddress(this.host, this.port);
}
/**
- * Connects to the server asynchronously.
+ * Initialises Connection pool.
*/
public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IggyClientException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
+ // Adding Response Handler Now Stateful
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
}
- });
- return future;
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
}
public <T> CompletableFuture<T> exchangeForEntity(
@@ -197,99 +217,166 @@ public class AsyncTcpConnection {
* Uses Netty's EventLoop to ensure thread-safe sequential request
processing with FIFO response matching.
*/
public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) {
- if (channel == null || !channel.isActive()) {
- payload.release();
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(
+ new IggyNotConnectedException("Connection not established
or closed"));
+ }
+ if (channelPool == null) {
return CompletableFuture.failedFuture(
new IggyNotConnectedException("Connection not established
or closed"));
}
+ captureLoginPayloadIfNeeded(commandCode, payload);
CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
- // Execute on channel's EventLoop to ensure sequential processing
- // This is necessary because Iggy protocol doesn't include request IDs,
- // and responses are matched using FIFO order
- channel.eventLoop().execute(() -> {
- // Since Iggy doesn't use request IDs, we'll just use a simple
queue
- // Each request will get the next response in order
- long requestId = requestIdGenerator.incrementAndGet();
- pendingRequests.put(requestId, responseFuture);
-
- // Build the request frame exactly like the blocking client
- // Frame format: [payload_size:4][command:4][payload:N]
- // where payload_size = 4 (command size) + N (payload size)
- int payloadSize = payload.readableBytes();
- int framePayloadSize = 4 + payloadSize; // command (4 bytes) +
payload
-
- ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
- frame.writeIntLE(framePayloadSize); // Length field (includes
command)
- frame.writeIntLE(commandCode); // Command
- frame.writeBytes(payload, payload.readerIndex(), payloadSize); //
Payload
-
- // Debug: print frame bytes
- if (log.isTraceEnabled()) {
- byte[] frameBytes = new byte[Math.min(frame.readableBytes(),
30)];
- frame.getBytes(0, frameBytes);
- StringBuilder hex = new StringBuilder();
- for (byte b : frameBytes) {
- hex.append(String.format("%02x ", b));
+ channelPool.acquire().addListener((FutureListener<Channel>) f -> {
+ if (!f.isSuccess()) {
+ responseFuture.completeExceptionally(f.cause());
+ return;
+ }
+
+ Channel channel = f.getNow();
+ if (Boolean.FALSE.equals(isAuthenticated.get())) {
+ IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
+ }
+ CompletableFuture<Void> authStep;
+ boolean isLoginOp = (commandCode ==
CommandCode.User.LOGIN.getValue()
+ || commandCode ==
CommandCode.PersonalAccessToken.LOGIN.getValue());
+
+ if (isLoginOp) {
+ authStep = CompletableFuture.completedFuture(null);
+ } else {
+ if (loginPayload == null) {
+ responseFuture.completeExceptionally(new
IggyNotConnectedException("Login First"));
}
- log.trace(
- "Sending frame with command: {}, payload size: {},
frame payload size (with command): {}, total frame size: {}",
- commandCode,
- payloadSize,
- framePayloadSize,
- frame.readableBytes());
- log.trace("Frame bytes (hex): {}", hex.toString());
+ authStep = IggyAuthenticator.ensureAuthenticated(
+ channel, loginPayload.retainedDuplicate(),
CommandCode.User.LOGIN.getValue());
}
+ authStep.thenRun(() -> sendFrame(channel, payload, commandCode,
responseFuture))
+ .exceptionally(ex -> {
+ payload.release();
+ responseFuture.completeExceptionally(ex);
+ return null;
+ });
+
+ responseFuture.handle((res, ex) -> {
+ handlePostResponse(channel, commandCode, isLoginOp, ex);
+ return null;
+ });
+ });
+
+ return responseFuture;
+ }
+
+ private void sendFrame(
+ Channel channel, ByteBuf payload, int commandCode,
CompletableFuture<ByteBuf> responseFuture) {
+ try {
+
+ IggyResponseHandler handler =
channel.pipeline().get(IggyResponseHandler.class);
+ if (handler == null) {
+ throw new IggyClientException("Channel missing
IggyResponseHandler");
+ }
+
+ // Enqueuing request so handler knows who to call back;
+ handler.enqueueRequest(responseFuture);
+
+ ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(),
commandCode, payload);
+
payload.release();
// Send the frame
channel.writeAndFlush(frame).addListener((ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
log.error("Failed to send frame: {}",
future.cause().getMessage());
- pendingRequests.remove(requestId);
+ frame.release();
+ channel.close();
responseFuture.completeExceptionally(future.cause());
} else {
log.trace("Frame sent successfully to {}",
channel.remoteAddress());
}
});
- });
- return responseFuture;
+ } catch (RuntimeException e) {
+ responseFuture.completeExceptionally(e);
+ }
+ }
+
+ private void handlePostResponse(Channel channel, int commandCode, boolean
isLoginOp, Throwable ex) {
+ if (isLoginOp) {
+ if (ex == null) {
+ isAuthenticated.set(true);
+ } else {
+ releaseLoginPayload();
+ }
+ }
+ if (commandCode == CommandCode.User.LOGOUT.getValue()) {
+ isAuthenticated.set(false);
+ IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
+ }
+ if (channelPool != null) {
+ channelPool.release(channel);
+ }
+ }
+
+ private void captureLoginPayloadIfNeeded(int commandCode, ByteBuf payload)
{
+ if (commandCode == CommandCode.User.LOGIN.getValue() || commandCode ==
CommandCode.User.UPDATE.getValue()) {
+ updateLoginPayload(payload);
+ }
+ }
+
+ private synchronized void updateLoginPayload(ByteBuf payload) {
+ if (this.loginPayload != null) {
+ loginPayload.release();
+ }
+ this.loginPayload = payload.retainedSlice();
+ }
+
+ private synchronized void releaseLoginPayload() {
+ if (this.loginPayload != null) {
+ loginPayload.release();
+ this.loginPayload = null;
+ }
}
/**
* Closes the connection and releases resources.
*/
public CompletableFuture<Void> close() {
- CompletableFuture<Void> future = new CompletableFuture<>();
-
- if (channel != null && channel.isActive()) {
- channel.close().addListener((ChannelFutureListener) channelFuture
-> {
- eventLoopGroup.shutdownGracefully();
- if (channelFuture.isSuccess()) {
- future.complete(null);
+ if (isClosed.compareAndSet(false, true)) {
+ if (channelPool != null) {
+ channelPool.close();
+ }
+ CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+ eventLoopGroup.shutdownGracefully().addListener(f -> {
+ if (f.isSuccess()) {
+ shutdownFuture.complete(null);
} else {
- future.completeExceptionally(channelFuture.cause());
+ shutdownFuture.completeExceptionally(null);
}
});
- } else {
- eventLoopGroup.shutdownGracefully();
- future.complete(null);
+ return shutdownFuture;
}
-
- return future;
+ return CompletableFuture.completedFuture(null);
}
/**
* Response handler that correlates responses with requests.
*/
- private static class IggyResponseHandler extends
SimpleChannelInboundHandler<ByteBuf> {
- private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>>
pendingRequests;
+ public static class IggyResponseHandler extends
SimpleChannelInboundHandler<ByteBuf> {
+ private final Queue<CompletableFuture<ByteBuf>> responseQueue = new
ConcurrentLinkedQueue<>();
+ private SimpleChannelPool pool;
+
+ public IggyResponseHandler() {
+ this.pool = null;
+ }
+
+ public void setPool(SimpleChannelPool pool) {
+ this.pool = pool;
+ }
- public IggyResponseHandler(ConcurrentHashMap<Long,
CompletableFuture<ByteBuf>> pendingRequests) {
- this.pendingRequests = pendingRequests;
+ public void enqueueRequest(CompletableFuture<ByteBuf> future) {
+ responseQueue.add(future);
}
@Override
@@ -298,36 +385,119 @@ public class AsyncTcpConnection {
int status = msg.readIntLE();
int length = msg.readIntLE();
- // Since Iggy doesn't use request IDs, we process responses in
order
- // Get the oldest pending request
- if (!pendingRequests.isEmpty()) {
- Long oldestRequestId =
-
pendingRequests.keySet().stream().min(Long::compare).orElse(null);
-
- if (oldestRequestId != null) {
- CompletableFuture<ByteBuf> future =
pendingRequests.remove(oldestRequestId);
-
- if (status == 0) {
- // Success - pass the remaining buffer as response
- future.complete(msg.retainedSlice());
- } else {
- // Error - the payload contains the error message
- byte[] errorBytes = length > 0 ? new byte[length] :
new byte[0];
- if (length > 0) {
- msg.readBytes(errorBytes);
- }
-
future.completeExceptionally(IggyServerException.fromTcpResponse(status,
errorBytes));
- }
+ CompletableFuture<ByteBuf> future = responseQueue.poll();
+
+ if (future != null) {
+
+ if (status == 0) {
+ // Success - pass the remaining buffer as response
+ future.complete(msg.retainedSlice());
+ } else {
+ // Error - the payload contains the error message
+
+ byte[] errorBytes = length > 0 ? new byte[length] : new
byte[0];
+ msg.readBytes(errorBytes);
+
future.completeExceptionally(IggyServerException.fromTcpResponse(status,
errorBytes));
}
+ } else {
+ log.error(
+ "Received response on channel {} but no request was
waiting!",
+ ctx.channel().id());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
- // Fail all pending requests
- pendingRequests.values().forEach(future ->
future.completeExceptionally(cause));
- pendingRequests.clear();
+ // If the connection dies, fail ALL waiting requests for this
connection
+ CompletableFuture<ByteBuf> f;
+ while ((f = responseQueue.poll()) != null) {
+ f.completeExceptionally(cause);
+ }
+ if (pool != null) {
+ pool.release(ctx.channel());
+ }
ctx.close();
}
}
+
+ public static class TCPConnectionPoolConfig {
+ private final int maxConnections;
+ private final int maxPendingAcquires;
+ private final long acquireTimeoutMillis;
+
+ public TCPConnectionPoolConfig() {
+ this(
+ Builder.DEFAULT_MAX_CONNECTION,
+ Builder.DEFAULT_MAX_PENDING_ACQUIRES,
+ Builder.DEFAULT_ACQUIRE_TIMEOUT_MILLIS);
+ }
+
+ public TCPConnectionPoolConfig(int maxConnections, int
maxPendingAcquires, long acquireTimeoutMillis) {
+ this.maxConnections = maxConnections;
+ this.maxPendingAcquires = maxPendingAcquires;
+ this.acquireTimeoutMillis = acquireTimeoutMillis;
+ }
+
+ public int getMaxConnections() {
+ return this.maxConnections;
+ }
+
+ public int getMaxPendingAcquires() {
+ return this.maxPendingAcquires;
+ }
+
+ public long getAcquireTimeoutMillis() {
+ return this.acquireTimeoutMillis;
+ }
+
+ // Builder Class for TCPConnectionPoolConfig
+ public static final class Builder {
+ public static final int DEFAULT_MAX_CONNECTION = 5;
+ public static final int DEFAULT_MAX_PENDING_ACQUIRES = 1000;
+ public static final int DEFAULT_ACQUIRE_TIMEOUT_MILLIS = 3000;
+
+ private int maxConnections;
+ private int maxPendingAcquires;
+ private long acquireTimeoutMillis;
+
+ public Builder() {}
+
+ public Builder setMaxConnections(int maxConnections) {
+ if (maxConnections <= 0) {
+ throw new IggyInvalidArgumentException("Connection pool
size cannot be 0 or negative");
+ }
+ this.maxConnections = maxConnections;
+ return this;
+ }
+
+ public Builder setMaxPendingAcquires(int maxPendingAcquires) {
+ if (maxPendingAcquires <= 0) {
+ throw new IggyInvalidArgumentException("Max Pending
Acquires cannot be 0 or negative");
+ }
+ this.maxPendingAcquires = maxPendingAcquires;
+ return this;
+ }
+
+ public Builder setAcquireTimeoutMillis(long acquireTimeoutMillis) {
+ if (acquireTimeoutMillis <= 0) {
+ throw new IggyInvalidArgumentException("Acquire timeout
cannot be 0 or negative");
+ }
+ this.acquireTimeoutMillis = acquireTimeoutMillis;
+ return this;
+ }
+
+ public TCPConnectionPoolConfig build() {
+ if (this.maxConnections == 0) {
+ this.maxConnections = DEFAULT_MAX_CONNECTION;
+ }
+ if (this.acquireTimeoutMillis == 0) {
+ this.acquireTimeoutMillis = DEFAULT_ACQUIRE_TIMEOUT_MILLIS;
+ }
+ if (this.maxPendingAcquires == 0) {
+ this.maxPendingAcquires = DEFAULT_MAX_PENDING_ACQUIRES;
+ }
+ return new TCPConnectionPoolConfig(maxConnections,
maxPendingAcquires, acquireTimeoutMillis);
+ }
+ }
+ }
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
new file mode 100644
index 000000000..b11f17ba5
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.AttributeKey;
+import org.apache.iggy.client.async.tcp.AsyncTcpConnection.IggyResponseHandler;
+import org.apache.iggy.exception.IggyAuthenticationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class IggyAuthenticator {
+ private static final AttributeKey<Boolean> AUTH_KEY =
AttributeKey.valueOf("AUTH_KEY");
+ private static final Logger log =
LoggerFactory.getLogger(IggyAuthenticator.class);
+
+ private IggyAuthenticator() {}
+
+ public static CompletableFuture<Void> ensureAuthenticated(Channel channel,
ByteBuf loginPayload, int commandCode) {
+ Boolean isAuth = channel.attr(AUTH_KEY).get();
+ if (Boolean.TRUE.equals(isAuth)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (loginPayload.equals(null)) {
+ return CompletableFuture.failedFuture(
+ new IggyAuthenticationException(null, commandCode, "login
first", null, null));
+ }
+
+ CompletableFuture<ByteBuf> loginFuture = new CompletableFuture<>();
+ IggyResponseHandler handler =
channel.pipeline().get(IggyResponseHandler.class);
+ handler.enqueueRequest(loginFuture);
+ ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(), commandCode,
loginPayload);
+ loginPayload.release();
+ channel.writeAndFlush(frame).addListener((ChannelFutureListener) f -> {
+ if (!f.isSuccess()) {
+ frame.release();
+ loginFuture.completeExceptionally(f.cause());
+ }
+ });
+
+ return loginFuture.thenAccept(result -> {
+ try {
+ channel.attr(AUTH_KEY).set(true);
+ log.debug("Channel {} authenticated successfully",
channel.id());
+ } finally {
+ result.release();
+ }
+ });
+ }
+
+ public static void setAuthAttribute(Channel channel, AtomicBoolean value) {
+ channel.attr(AUTH_KEY).set(value.get());
+ }
+
+ public static Boolean getAuthAttribute(Channel channel) {
+ return channel.attr(AUTH_KEY).get();
+ }
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
new file mode 100644
index 000000000..b26b31743
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class IggyFrameEncoder {
+ private static final Logger log =
LoggerFactory.getLogger(IggyFrameEncoder.class);
+
+ private IggyFrameEncoder() {}
+
+ public static ByteBuf encode(ByteBufAllocator alloc, int commandCode,
ByteBuf payload) {
+
+ // Build the request frame exactly like the blocking client
+ // Frame format: [payload_size:4][command:4][payload:N]
+ // where payload_size = 4 (command size) + N (payload size)
+ int payloadSize = payload.readableBytes();
+ int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload
+ ByteBuf frame = alloc.buffer(4 + framePayloadSize);
+ frame.writeIntLE(framePayloadSize); // Length field (includes command)
+ frame.writeIntLE(commandCode); // Command
+ frame.writeBytes(payload, payload.readerIndex(), payloadSize); //
Payload
+
+ // Debug: print frame bytes
+ byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
+ if (log.isTraceEnabled()) {
+ frame.getBytes(0, frameBytes);
+ StringBuilder hex = new StringBuilder();
+ for (byte b : frameBytes) {
+ hex.append(String.format("%02x ", b));
+ }
+ log.trace(
+ "Sending frame with command: {}, payload size: {}, frame
payload size (with command): {}, total frame size: {}",
+ commandCode,
+ payloadSize,
+ framePayloadSize,
+ frame.readableBytes());
+ log.trace("Frame bytes (hex): {}", hex.toString());
+ }
+
+ return frame;
+ }
+}