mmodzelewski commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2849167099


##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -242,36 +334,94 @@ protected void channelRead0(ChannelHandlerContext ctx, 
ByteBuf msg) {
             int status = msg.readIntLE();
             int length = msg.readIntLE();
 
-            // Since Iggy doesn't use request IDs, we process responses in 
order
-            // Get the oldest pending request
-            if (!pendingRequests.isEmpty()) {
-                Long oldestRequestId =
-                        
pendingRequests.keySet().stream().min(Long::compare).orElse(null);
+            CompletableFuture<ByteBuf> future = responseQueue.poll();
 
-                if (oldestRequestId != null) {
-                    CompletableFuture<ByteBuf> future = 
pendingRequests.remove(oldestRequestId);
+            if (future != null) {
 
-                    if (status == 0) {
-                        // Success - pass the remaining buffer as response
-                        future.complete(msg.retainedSlice());
-                    } else {
-                        // Error - the payload contains the error message
-                        byte[] errorBytes = length > 0 ? new byte[length] : 
new byte[0];
-                        if (length > 0) {
-                            msg.readBytes(errorBytes);
-                        }
+                if (status == 0) {
+                    // Success - pass the remaining buffer as response
+                    future.complete(msg.retainedSlice());
+                } else {
+                    // Error - the payload contains the error message
+                    if (length > 0) {
+                        byte[] errorBytes = new byte[length];
+                        msg.readBytes(errorBytes);
                         
future.completeExceptionally(IggyServerException.fromTcpResponse(status, 
errorBytes));
+                    } else {
+                        future.completeExceptionally(new 
IggyServerException(status));
                     }
                 }
+            } else {
+                log.error(
+                        "Received response on channel {} but no request was 
waiting!",
+                        ctx.channel().id());
+            }
+            if (pool != null) {
+                pool.release(ctx.channel());
             }
         }
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {

Review Comment:
   A channel should also be released to the pool in case of exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to