szetszwo commented on code in PR #1285:
URL: https://github.com/apache/ratis/pull/1285#discussion_r2357399924


##########
ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java:
##########
@@ -121,6 +124,35 @@ protected void channelRead0(ChannelHandlerContext ctx,
             future.complete(proto);
           }
         }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+          if (!replies.isEmpty()) {
+            LOG.error(
+              "Still have {} requests outstanding when caught exception from 
{} connection",
+              replies.size(),
+              peer,
+              cause);
+            failOutstandingRequests(cause);
+          }
+          client.close();
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) {
+          if (!replies.isEmpty()) {
+            LOG.error(
+              "Still have {} requests outstanding when connection from {} is 
closed",
+              replies.size(),
+              peer);
+            failOutstandingRequests(new IOException("Connection to " + peer + 
" is closed."));
+          }
+        }
+
+        private void failOutstandingRequests(Throwable cause) {
+          replies.forEach(f -> f.completeExceptionally(cause));
+          replies.clear();
+        }

Review Comment:
   Let's move it out.  Then, it can also be used in `close()`.
   
   BTW, could you also change the IOException to AlreadyClosedException?
   ```diff
   @@ -153,9 +169,14 @@ public class NettyRpcProxy implements Closeable {
        @Override
        public synchronized void close() {
          client.close();
   +      failOutstandingRequests(new AlreadyClosedException("Closing 
connection to " + peer));
   +    }
   +
   +    private void failOutstandingRequests(Throwable cause) {
          if (!replies.isEmpty()) {
   -        final IOException e = new IOException("Connection to " + peer + " 
is closed.");
   -        replies.stream().forEach(f -> f.completeExceptionally(e));
   +        LOG.warn("Still have {} requests outstanding from {} connection: 
{}",
   +            replies.size(), peer, cause.toString());
   +        replies.forEach(f -> f.completeExceptionally(cause));
            replies.clear();
          }
        }
   ```



##########
ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java:
##########
@@ -121,6 +124,35 @@ protected void channelRead0(ChannelHandlerContext ctx,
             future.complete(proto);
           }
         }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+          if (!replies.isEmpty()) {
+            LOG.error(
+              "Still have {} requests outstanding when caught exception from 
{} connection",
+              replies.size(),
+              peer,
+              cause);
+            failOutstandingRequests(cause);
+          }
+          client.close();
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) {
+          if (!replies.isEmpty()) {
+            LOG.error(
+              "Still have {} requests outstanding when connection from {} is 
closed",
+              replies.size(),
+              peer);
+            failOutstandingRequests(new IOException("Connection to " + peer + 
" is closed."));
+          }
+        }

Review Comment:
   Then, these two methods become very short.  BTW, let's
   - Wrap the cause with IOException in the first method so it could include 
the peer.
   - Use `AlreadyClosedException` for the second method and use a different 
message.
   ```java
           @Override
           public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
             failOutstandingRequests(new IOException("Caught an exception for 
the connection to " + peer, cause));
             client.close();
           }
   
           @Override
           public void channelInactive(ChannelHandlerContext ctx) {
             failOutstandingRequests(new AlreadyClosedException("Channel to " + 
peer + " is inactive."));
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to