This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 3b29d336d refactor(java): use generation-based auth for pooled
channels (#2910)
3b29d336d is described below
commit 3b29d336d2c181f084c85cddf578f9fa3f99dbd9
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Tue Mar 10 11:58:34 2026 +0100
refactor(java): use generation-based auth for pooled channels (#2910)
Pooled connections authenticated with a simple boolean flag
failed to re-authenticate after logout/re-login cycles.
Channels from the prior session appeared authenticated but
held stale server-side state, causing silent failures.
Replace the boolean auth attribute with an AtomicLong
generation counter. Each login increments the generation;
logout invalidates all channels by advancing it. Channels
compare their stored generation on acquire and transparently
re-authenticate when stale.
Additional cleanup: move pool initialization into the
constructor with eager connectivity validation, extract
PoolChannelHandler as a static inner class, fix payload
leak on acquire failure by releasing in finally, fix
login capture to include PAT login (was incorrectly
capturing UPDATE), reduce IggyAuthenticator/FrameEncoder
visibility to package-private, and migrate to Netty 5
IoEventLoopGroup API.
---
.../iggy/client/async/tcp/AsyncIggyTcpClient.java | 4 +-
.../iggy/client/async/tcp/AsyncTcpConnection.java | 278 +++++++++------------
.../iggy/client/async/tcp/IggyAuthenticator.java | 46 ++--
.../iggy/client/async/tcp/IggyFrameEncoder.java | 24 +-
.../client/async/AsyncConnectionPoolAuthTest.java | 263 +++++++++++++++++++
5 files changed, 430 insertions(+), 185 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 04e5e1d28..cdd2c6716 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -171,7 +171,6 @@ public class AsyncIggyTcpClient {
connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections);
acquireTimeout.ifPresent(timeout ->
poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis()));
TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
- // TCPConnectionPoolConfig poolConfig = new TCPConnectionPoolConfig();
connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate, poolConfig);
return connection.connect().thenRun(() -> {
messagesClient = new MessagesTcpClient(connection);
@@ -195,7 +194,7 @@ public class AsyncIggyTcpClient {
* {@link UsersClient#login(String, String)} instead.
*
* @return a {@link CompletableFuture} that completes with the user's
- * {@link IdentityInfo} on success
+ * {@link IdentityInfo} on success
* @throws IggyMissingCredentialsException if no credentials were provided
at build time
* @throws IggyNotConnectedException if {@link #connect()} has not
been called
*/
@@ -221,6 +220,7 @@ public class AsyncIggyTcpClient {
}
return usersClient;
}
+
/**
* Returns the async messages client for producing and consuming messages.
*
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index ca0a1f2f4..20b49f0f5 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -26,13 +26,13 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
+import io.netty.channel.IoEventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.pool.AbstractChannelPoolHandler;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.FixedChannelPool;
-import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
@@ -56,6 +56,7 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -64,23 +65,15 @@ import java.util.function.Function;
*/
public class AsyncTcpConnection {
private static final Logger log =
LoggerFactory.getLogger(AsyncTcpConnection.class);
- private final String host;
- private final int port;
- private final boolean enableTls;
- private final Optional<File> tlsCertificate;
- private final SslContext sslContext;
- private final EventLoopGroup eventLoopGroup;
- private final Bootstrap bootstrap;
- private SimpleChannelPool channelPool;
- private final TCPConnectionPoolConfig poolConfig;
- private ByteBuf loginPayload;
- private AtomicBoolean isAuthenticated = new AtomicBoolean(false);
+ private final IoEventLoopGroup eventLoopGroup;
+ private final FixedChannelPool channelPool;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final AtomicLong authGeneration = new AtomicLong(0);
+ private ByteBuf loginPayload;
- public AsyncTcpConnection(String host, int port) {
- this(host, port, false, Optional.empty(), new
TCPConnectionPoolConfig());
- }
+ private volatile int loginCommandCode;
+ private volatile boolean authenticated = false;
public AsyncTcpConnection(
String host,
@@ -88,77 +81,53 @@ public class AsyncTcpConnection {
boolean enableTls,
Optional<File> tlsCertificate,
TCPConnectionPoolConfig poolConfig) {
- this.host = host;
- this.port = port;
- this.enableTls = enableTls;
- this.tlsCertificate = tlsCertificate;
- this.poolConfig = poolConfig;
- this.eventLoopGroup = new NioEventLoopGroup();
- this.bootstrap = new Bootstrap();
-
- if (this.enableTls) {
+ this.eventLoopGroup = new
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+
+ SslContext sslContext = null;
+ if (enableTls) {
try {
- SslContextBuilder builder = SslContextBuilder.forClient();
- this.tlsCertificate.ifPresent(builder::trustManager);
- this.sslContext = builder.build();
+ SslContextBuilder sslBuilder = SslContextBuilder.forClient();
+ tlsCertificate.ifPresent(sslBuilder::trustManager);
+ sslContext = sslBuilder.build();
} catch (SSLException e) {
throw new IggyTlsException("Failed to build SSL context for
AsyncTcpConnection", e);
}
- } else {
- this.sslContext = null;
}
- configureBootstrap();
- }
- private void configureBootstrap() {
- bootstrap
+ var bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .remoteAddress(this.host, this.port);
- }
-
- /**
- * Initialises Connection pool.
- */
- public CompletableFuture<Void> connect() {
- if (isClosed.get()) {
- return CompletableFuture.failedFuture(new
IggyClientException("Client is Closed"));
- }
- AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
- @Override
- public void channelCreated(Channel ch) {
- ChannelPipeline pipeline = ch.pipeline();
- if (enableTls) {
- // adding ssl if ssl enabled
- pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
- }
- // Adding the FrameDecoder to end of channel pipeline
- pipeline.addLast("frameDecoder", new IggyFrameDecoder());
-
- // Adding Response Handler Now Stateful
- pipeline.addLast("responseHandler", new IggyResponseHandler());
- }
-
- @Override
- public void channelAcquired(Channel ch) {
- IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
- handler.setPool(channelPool);
- }
- };
+ .remoteAddress(host, port);
this.channelPool = new FixedChannelPool(
bootstrap,
- poolHandler,
- ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
- FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ new PoolChannelHandler(host, port, enableTls, sslContext),
+ ChannelHealthChecker.ACTIVE,
+ FixedChannelPool.AcquireTimeoutAction.FAIL,
poolConfig.getAcquireTimeoutMillis(),
poolConfig.getMaxConnections(),
poolConfig.getMaxPendingAcquires());
+
log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
- return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Validates server reachability by eagerly acquiring and releasing one
connection.
+ */
+ public CompletableFuture<Void> connect() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ channelPool.acquire().addListener((FutureListener<Channel>) f -> {
+ if (f.isSuccess()) {
+ channelPool.release(f.getNow());
+ future.complete(null);
+ } else {
+ future.completeExceptionally(f.cause());
+ }
+ });
+ return future;
}
public <T> CompletableFuture<T> exchangeForEntity(
@@ -205,131 +174,134 @@ public class AsyncTcpConnection {
}
public CompletableFuture<Void> sendAndRelease(CommandCode commandCode,
ByteBuf payload) {
- return send(commandCode, payload).thenAccept(response ->
response.release());
+ return send(commandCode, payload).thenAccept(ByteBuf::release);
}
public CompletableFuture<ByteBuf> send(CommandCode commandCode, ByteBuf
payload) {
return send(commandCode.getValue(), payload);
}
- /**
- * Sends a command asynchronously and returns the response.
- * Uses Netty's EventLoop to ensure thread-safe sequential request
processing with FIFO response matching.
- */
public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) {
- if (isClosed.get()) {
- return CompletableFuture.failedFuture(
- new IggyNotConnectedException("Connection not established
or closed"));
- }
- if (channelPool == null) {
- return CompletableFuture.failedFuture(
- new IggyNotConnectedException("Connection not established
or closed"));
- }
-
captureLoginPayloadIfNeeded(commandCode, payload);
CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
channelPool.acquire().addListener((FutureListener<Channel>) f -> {
if (!f.isSuccess()) {
- responseFuture.completeExceptionally(f.cause());
+ payload.release();
+
responseFuture.completeExceptionally(mapAcquireException(f.cause()));
return;
}
Channel channel = f.getNow();
- if (Boolean.FALSE.equals(isAuthenticated.get())) {
- IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
- }
- CompletableFuture<Void> authStep;
boolean isLoginOp = (commandCode ==
CommandCode.User.LOGIN.getValue()
|| commandCode ==
CommandCode.PersonalAccessToken.LOGIN.getValue());
+ responseFuture.handle((res, ex) -> {
+ handlePostResponse(channel, commandCode, isLoginOp, ex);
+ return null;
+ });
+
+ CompletableFuture<Void> authStep;
if (isLoginOp) {
authStep = CompletableFuture.completedFuture(null);
+ } else if (!authenticated) {
+ payload.release();
+ responseFuture.completeExceptionally(
+ new IggyNotConnectedException("Not authenticated, call
login first"));
+ return;
} else {
- if (loginPayload == null) {
- responseFuture.completeExceptionally(new
IggyNotConnectedException("Login First"));
+ ByteBuf loginPayloadCopy = getLoginPayloadCopy();
+ if (loginPayloadCopy == null) {
+ payload.release();
+ responseFuture.completeExceptionally(
+ new IggyNotConnectedException("Not authenticated,
call login first"));
+ return;
}
authStep = IggyAuthenticator.ensureAuthenticated(
- channel, loginPayload.retainedDuplicate(),
CommandCode.User.LOGIN.getValue());
+ channel, loginPayloadCopy, loginCommandCode,
authGeneration);
}
authStep.thenRun(() -> sendFrame(channel, payload, commandCode,
responseFuture))
.exceptionally(ex -> {
- payload.release();
responseFuture.completeExceptionally(ex);
return null;
});
-
- responseFuture.handle((res, ex) -> {
- handlePostResponse(channel, commandCode, isLoginOp, ex);
- return null;
- });
});
return responseFuture;
}
+ private static Throwable mapAcquireException(Throwable cause) {
+ if (cause instanceof IllegalStateException) {
+ return new IggyNotConnectedException("Connection pool is closed");
+ }
+ return cause;
+ }
+
private void sendFrame(
Channel channel, ByteBuf payload, int commandCode,
CompletableFuture<ByteBuf> responseFuture) {
try {
-
IggyResponseHandler handler =
channel.pipeline().get(IggyResponseHandler.class);
if (handler == null) {
throw new IggyClientException("Channel missing
IggyResponseHandler");
}
- // Enqueuing request so handler knows who to call back;
handler.enqueueRequest(responseFuture);
-
ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(),
commandCode, payload);
- payload.release();
-
- // Send the frame
channel.writeAndFlush(frame).addListener((ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
log.error("Failed to send frame: {}",
future.cause().getMessage());
- frame.release();
- channel.close();
responseFuture.completeExceptionally(future.cause());
} else {
log.trace("Frame sent successfully to {}",
channel.remoteAddress());
}
});
-
} catch (RuntimeException e) {
responseFuture.completeExceptionally(e);
+ } finally {
+ payload.release();
}
}
private void handlePostResponse(Channel channel, int commandCode, boolean
isLoginOp, Throwable ex) {
if (isLoginOp) {
if (ex == null) {
- isAuthenticated.set(true);
+ authenticated = true;
+ long generation = authGeneration.incrementAndGet();
+ IggyAuthenticator.setAuthGeneration(channel, generation);
} else {
releaseLoginPayload();
}
}
if (commandCode == CommandCode.User.LOGOUT.getValue()) {
- isAuthenticated.set(false);
- IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
- }
- if (channelPool != null) {
- channelPool.release(channel);
+ authenticated = false;
+ authGeneration.incrementAndGet();
+ IggyAuthenticator.clearAuthGeneration(channel);
}
+ channelPool.release(channel);
}
private void captureLoginPayloadIfNeeded(int commandCode, ByteBuf payload)
{
- if (commandCode == CommandCode.User.LOGIN.getValue() || commandCode ==
CommandCode.User.UPDATE.getValue()) {
- updateLoginPayload(payload);
+ if (commandCode == CommandCode.User.LOGIN.getValue()
+ || commandCode ==
CommandCode.PersonalAccessToken.LOGIN.getValue()) {
+ updateLoginPayload(commandCode, payload);
}
}
- private synchronized void updateLoginPayload(ByteBuf payload) {
+ private synchronized void updateLoginPayload(int commandCode, ByteBuf
payload) {
if (this.loginPayload != null) {
loginPayload.release();
}
this.loginPayload = payload.retainedSlice();
+ this.loginCommandCode = commandCode;
+ }
+
+ private synchronized ByteBuf getLoginPayloadCopy() {
+ if (this.loginPayload != null) {
+ return loginPayload.retainedDuplicate();
+ }
+ return null;
}
private synchronized void releaseLoginPayload() {
@@ -339,41 +311,49 @@ public class AsyncTcpConnection {
}
}
- /**
- * Closes the connection and releases resources.
- */
public CompletableFuture<Void> close() {
- if (isClosed.compareAndSet(false, true)) {
- if (channelPool != null) {
- channelPool.close();
- }
- CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
- eventLoopGroup.shutdownGracefully().addListener(f -> {
- if (f.isSuccess()) {
- shutdownFuture.complete(null);
- } else {
- shutdownFuture.completeExceptionally(null);
- }
- });
- return shutdownFuture;
+ if (!isClosed.compareAndSet(false, true)) {
+ return CompletableFuture.completedFuture(null);
}
- return CompletableFuture.completedFuture(null);
+ releaseLoginPayload();
+ channelPool.close();
+ CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+ eventLoopGroup.shutdownGracefully().addListener(f -> {
+ if (f.isSuccess()) {
+ shutdownFuture.complete(null);
+ } else {
+ shutdownFuture.completeExceptionally(f.cause());
+ }
+ });
+ return shutdownFuture;
}
- /**
- * Response handler that correlates responses with requests.
- */
- public static class IggyResponseHandler extends
SimpleChannelInboundHandler<ByteBuf> {
- private final Queue<CompletableFuture<ByteBuf>> responseQueue = new
ConcurrentLinkedQueue<>();
- private SimpleChannelPool pool;
-
- public IggyResponseHandler() {
- this.pool = null;
+ private static final class PoolChannelHandler extends
AbstractChannelPoolHandler {
+ private final String host;
+ private final int port;
+ private final boolean enableTls;
+ private final SslContext sslContext;
+
+ PoolChannelHandler(String host, int port, boolean enableTls,
SslContext sslContext) {
+ this.host = host;
+ this.port = port;
+ this.enableTls = enableTls;
+ this.sslContext = sslContext;
}
- public void setPool(SimpleChannelPool pool) {
- this.pool = pool;
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
}
+ }
+
+ public static class IggyResponseHandler extends
SimpleChannelInboundHandler<ByteBuf> {
+ private final Queue<CompletableFuture<ByteBuf>> responseQueue = new
ConcurrentLinkedQueue<>();
public void enqueueRequest(CompletableFuture<ByteBuf> future) {
responseQueue.add(future);
@@ -381,20 +361,15 @@ public class AsyncTcpConnection {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- // Read response header (status and length only - no request ID)
int status = msg.readIntLE();
int length = msg.readIntLE();
CompletableFuture<ByteBuf> future = responseQueue.poll();
if (future != null) {
-
if (status == 0) {
- // Success - pass the remaining buffer as response
future.complete(msg.retainedSlice());
} else {
- // Error - the payload contains the error message
-
byte[] errorBytes = length > 0 ? new byte[length] : new
byte[0];
msg.readBytes(errorBytes);
future.completeExceptionally(IggyServerException.fromTcpResponse(status,
errorBytes));
@@ -408,14 +383,10 @@ public class AsyncTcpConnection {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
- // If the connection dies, fail ALL waiting requests for this
connection
CompletableFuture<ByteBuf> f;
while ((f = responseQueue.poll()) != null) {
f.completeExceptionally(cause);
}
- if (pool != null) {
- pool.release(ctx.channel());
- }
ctx.close();
}
}
@@ -450,7 +421,6 @@ public class AsyncTcpConnection {
return this.acquireTimeoutMillis;
}
- // Builder Class for TCPConnectionPoolConfig
public static final class Builder {
public static final int DEFAULT_MAX_CONNECTION = 5;
public static final int DEFAULT_MAX_PENDING_ACQUIRES = 1000;
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
index b11f17ba5..71bfcdb60 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
@@ -24,27 +24,42 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.AttributeKey;
import org.apache.iggy.client.async.tcp.AsyncTcpConnection.IggyResponseHandler;
-import org.apache.iggy.exception.IggyAuthenticationException;
+import org.apache.iggy.exception.IggyNotConnectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-public final class IggyAuthenticator {
- private static final AttributeKey<Boolean> AUTH_KEY =
AttributeKey.valueOf("AUTH_KEY");
+final class IggyAuthenticator {
private static final Logger log =
LoggerFactory.getLogger(IggyAuthenticator.class);
+ private static final AttributeKey<Long> AUTH_GENERATION_KEY =
AttributeKey.valueOf("AUTH_GENERATION");
private IggyAuthenticator() {}
- public static CompletableFuture<Void> ensureAuthenticated(Channel channel,
ByteBuf loginPayload, int commandCode) {
- Boolean isAuth = channel.attr(AUTH_KEY).get();
- if (Boolean.TRUE.equals(isAuth)) {
+ /**
+ * Ensures the channel is authenticated for the current authentication
generation.
+ * If the channel's stored generation matches the current one, it is
already authenticated.
+ * Otherwise, sends a login command on the channel and updates the
generation on success.
+ *
+ * @param channel the channel to authenticate
+ * @param loginPayload the login payload to send (will be released by
this method)
+ * @param commandCode the login command code
+ * @param currentGeneration the current authentication generation counter
+ * @return a future that completes when authentication is done
+ */
+ static CompletableFuture<Void> ensureAuthenticated(
+ Channel channel, ByteBuf loginPayload, int commandCode, AtomicLong
currentGeneration) {
+ Long channelGeneration = channel.attr(AUTH_GENERATION_KEY).get();
+ long requiredGeneration = currentGeneration.get();
+
+ if (channelGeneration != null && channelGeneration ==
requiredGeneration) {
+ loginPayload.release();
return CompletableFuture.completedFuture(null);
}
- if (loginPayload.equals(null)) {
- return CompletableFuture.failedFuture(
- new IggyAuthenticationException(null, commandCode, "login
first", null, null));
+
+ if (loginPayload == null) {
+ return CompletableFuture.failedFuture(new
IggyNotConnectedException("Not authenticated, call login first"));
}
CompletableFuture<ByteBuf> loginFuture = new CompletableFuture<>();
@@ -54,14 +69,13 @@ public final class IggyAuthenticator {
loginPayload.release();
channel.writeAndFlush(frame).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
- frame.release();
loginFuture.completeExceptionally(f.cause());
}
});
return loginFuture.thenAccept(result -> {
try {
- channel.attr(AUTH_KEY).set(true);
+ channel.attr(AUTH_GENERATION_KEY).set(currentGeneration.get());
log.debug("Channel {} authenticated successfully",
channel.id());
} finally {
result.release();
@@ -69,11 +83,11 @@ public final class IggyAuthenticator {
});
}
- public static void setAuthAttribute(Channel channel, AtomicBoolean value) {
- channel.attr(AUTH_KEY).set(value.get());
+ static void setAuthGeneration(Channel channel, long generation) {
+ channel.attr(AUTH_GENERATION_KEY).set(generation);
}
- public static Boolean getAuthAttribute(Channel channel) {
- return channel.attr(AUTH_KEY).get();
+ static void clearAuthGeneration(Channel channel) {
+ channel.attr(AUTH_GENERATION_KEY).set(null);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
index b26b31743..87f0eb479 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
@@ -24,26 +24,24 @@ import io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class IggyFrameEncoder {
+final class IggyFrameEncoder {
private static final Logger log =
LoggerFactory.getLogger(IggyFrameEncoder.class);
private IggyFrameEncoder() {}
- public static ByteBuf encode(ByteBufAllocator alloc, int commandCode,
ByteBuf payload) {
-
- // Build the request frame exactly like the blocking client
- // Frame format: [payload_size:4][command:4][payload:N]
- // where payload_size = 4 (command size) + N (payload size)
+ /**
+ * Encodes a command into the Iggy TCP frame format:
[payload_size:4][command:4][payload:N]
+ */
+ static ByteBuf encode(ByteBufAllocator alloc, int commandCode, ByteBuf
payload) {
int payloadSize = payload.readableBytes();
- int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload
+ int framePayloadSize = 4 + payloadSize;
ByteBuf frame = alloc.buffer(4 + framePayloadSize);
- frame.writeIntLE(framePayloadSize); // Length field (includes command)
- frame.writeIntLE(commandCode); // Command
- frame.writeBytes(payload, payload.readerIndex(), payloadSize); //
Payload
+ frame.writeIntLE(framePayloadSize);
+ frame.writeIntLE(commandCode);
+ frame.writeBytes(payload);
- // Debug: print frame bytes
- byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
if (log.isTraceEnabled()) {
+ byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
frame.getBytes(0, frameBytes);
StringBuilder hex = new StringBuilder();
for (byte b : frameBytes) {
@@ -55,7 +53,7 @@ public final class IggyFrameEncoder {
payloadSize,
framePayloadSize,
frame.readableBytes());
- log.trace("Frame bytes (hex): {}", hex.toString());
+ log.trace("Frame bytes (hex): {}", hex);
}
return frame;
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java
new file mode 100644
index 000000000..ce7fd59f6
--- /dev/null
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.client.BaseIntegrationTest;
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.exception.IggyNotConnectedException;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for connection pool authentication lifecycle.
+ * Verifies that lazy per-channel authentication works correctly across
+ * login, logout, and re-login cycles with a pooled connection.
+ */
+@DisplayName("Connection Pool Authentication")
+class AsyncConnectionPoolAuthTest extends BaseIntegrationTest {
+ private static final Logger log =
LoggerFactory.getLogger(AsyncConnectionPoolAuthTest.class);
+
+ private static final String USERNAME = "iggy";
+ private static final String PASSWORD = "iggy";
+ private static final int POOL_SIZE = 3;
+
+ private AsyncIggyTcpClient client;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ client = AsyncIggyTcpClient.builder()
+ .host(serverHost())
+ .port(serverTcpPort())
+ .connectionPoolSize(POOL_SIZE)
+ .build();
+ client.connect().get(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (client != null) {
+ client.close().get(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ @DisplayName("should reject commands before login")
+ void shouldRejectCommandsBeforeLogin() {
+ // when/then
+ assertThatThrownBy(() -> client.streams().getStreams().get(5,
TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(IggyNotConnectedException.class);
+ }
+
+ @Test
+ @DisplayName("should execute commands after login")
+ void shouldExecuteCommandsAfterLogin() throws Exception {
+ // given
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+ // when
+ var streams = client.streams().getStreams().get(5, TimeUnit.SECONDS);
+
+ // then
+ assertThat(streams).isNotNull();
+ }
+
+ @Test
+ @DisplayName("should login, logout, and re-login successfully")
+ void shouldLoginLogoutAndReLogin() throws Exception {
+ // given
+ String streamName = "auth-test-" + UUID.randomUUID();
+
+ // when - first login
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+ var streamsAfterLogin = client.streams().getStreams().get(5,
TimeUnit.SECONDS);
+
+ // then
+ assertThat(streamsAfterLogin).isNotNull();
+ log.info("First login successful, got {} streams",
streamsAfterLogin.size());
+
+ // when - create a stream to verify full functionality
+ var stream = client.streams().createStream(streamName).get(5,
TimeUnit.SECONDS);
+ assertThat(stream).isNotNull();
+ assertThat(stream.name()).isEqualTo(streamName);
+
+ // when - logout
+ client.users().logout().get(5, TimeUnit.SECONDS);
+ log.info("Logout successful");
+
+ // then - commands should fail after logout
+ assertThatThrownBy(() -> client.streams().getStreams().get(5,
TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(IggyNotConnectedException.class);
+ log.info("Commands correctly rejected after logout");
+
+ // when - re-login
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+ log.info("Re-login successful");
+
+ // then - commands should work again
+ var streamsAfterReLogin = client.streams().getStreams().get(5,
TimeUnit.SECONDS);
+ assertThat(streamsAfterReLogin).isNotNull();
+
+ // cleanup
+ client.streams().deleteStream(StreamId.of(streamName)).get(5,
TimeUnit.SECONDS);
+ }
+
+ @Test
+ @DisplayName("should authenticate all pool channels lazily via concurrent
requests")
+ void shouldAuthenticatePoolChannelsLazily() throws Exception {
+ // given
+ String streamName = "pool-auth-test-" + UUID.randomUUID();
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+ client.streams().createStream(streamName).get(5, TimeUnit.SECONDS);
+ client.topics()
+ .createTopic(
+ StreamId.of(streamName),
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ "test-topic")
+ .get(5, TimeUnit.SECONDS);
+
+ // when - fire more concurrent requests than the pool size to force
+ // multiple channels to be created and lazily authenticated
+ int concurrentRequests = POOL_SIZE * 3;
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (int i = 0; i < concurrentRequests; i++) {
+ var future = client.messages()
+ .sendMessages(
+ StreamId.of(streamName),
+
org.apache.iggy.identifier.TopicId.of("test-topic"),
+ Partitioning.partitionId(0L),
+ List.of(Message.of("msg-" + i)));
+ futures.add(future);
+ }
+
+ // then - all requests should complete successfully
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).get(15, TimeUnit.SECONDS);
+ log.info("All {} concurrent requests completed successfully",
concurrentRequests);
+
+ // cleanup
+ client.streams().deleteStream(StreamId.of(streamName)).get(5,
TimeUnit.SECONDS);
+ }
+
+ @Test
+ @DisplayName("should re-authenticate stale channels after logout and
re-login")
+ void shouldReAuthenticateStaleChannelsAfterReLogin() throws Exception {
+ // given - login and warm up multiple pool channels
+ String streamName = "reauth-test-" + UUID.randomUUID();
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+ client.streams().createStream(streamName).get(5, TimeUnit.SECONDS);
+ client.topics()
+ .createTopic(
+ StreamId.of(streamName),
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ "test-topic")
+ .get(5, TimeUnit.SECONDS);
+
+ List<CompletableFuture<Void>> warmupFutures = new ArrayList<>();
+ for (int i = 0; i < POOL_SIZE * 2; i++) {
+ warmupFutures.add(client.messages()
+ .sendMessages(
+ StreamId.of(streamName),
+
org.apache.iggy.identifier.TopicId.of("test-topic"),
+ Partitioning.partitionId(0L),
+ List.of(Message.of("warmup-" + i))));
+ }
+ CompletableFuture.allOf(warmupFutures.toArray(new
CompletableFuture[0])).get(10, TimeUnit.SECONDS);
+ log.info("Pool warmed up with {} requests", warmupFutures.size());
+
+ // when - logout and re-login (invalidates all channel auth
generations)
+ client.users().logout().get(5, TimeUnit.SECONDS);
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+ log.info("Logout + re-login complete");
+
+ // then - all channels should re-authenticate transparently
+ List<CompletableFuture<Void>> postReLoginFutures = new ArrayList<>();
+ for (int i = 0; i < POOL_SIZE * 2; i++) {
+ postReLoginFutures.add(client.messages()
+ .sendMessages(
+ StreamId.of(streamName),
+
org.apache.iggy.identifier.TopicId.of("test-topic"),
+ Partitioning.partitionId(0L),
+ List.of(Message.of("after-relogin-" + i))));
+ }
+ CompletableFuture.allOf(postReLoginFutures.toArray(new
CompletableFuture[0]))
+ .get(15, TimeUnit.SECONDS);
+ log.info("All {} post-re-login requests succeeded",
postReLoginFutures.size());
+
+ // cleanup
+ client.streams().deleteStream(StreamId.of(streamName)).get(5,
TimeUnit.SECONDS);
+ }
+
+ @Test
+ @DisplayName("should handle multiple sequential login-logout cycles")
+ void shouldHandleMultipleLoginLogoutCycles() throws Exception {
+ // given
+ int cycles = 3;
+
+ for (int i = 0; i < cycles; i++) {
+ // when - login
+ client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+ // then - verify commands work
+ var streams = client.streams().getStreams().get(5,
TimeUnit.SECONDS);
+ assertThat(streams).isNotNull();
+ log.info("Cycle {}: login and command succeeded", i + 1);
+
+ // when - logout
+ client.users().logout().get(5, TimeUnit.SECONDS);
+
+ // then - verify commands are rejected
+ assertThatThrownBy(() -> client.streams().getStreams().get(5,
TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(IggyNotConnectedException.class);
+ log.info("Cycle {}: logout and rejection verified", i + 1);
+ }
+ }
+}