Author: fhanik
Date: Mon Jul  2 11:46:20 2007
New Revision: 552560

URL: http://svn.apache.org/viewvc?view=rev&rev=552560
Log:
Force closure of connections upon a server shutdown

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    
tomcat/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?view=diff&rev=552560&r1=552559&r2=552560
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
Mon Jul  2 11:46:20 2007
@@ -37,6 +37,7 @@
 import java.util.LinkedList;
 import java.util.Set;
 import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedSelectorException;
 
 /**
  * @author Filip Hanik
@@ -303,8 +304,7 @@
 
         }
         serverChannel.close();
-        if (selector != null)
-            selector.close();
+        closeSelector();
     }
 
     
@@ -319,13 +319,34 @@
         if (selector != null) {
             try {
                 selector.wakeup();
-                selector.close();
+                closeSelector();
             } catch (Exception x) {
                 log.error("Unable to close cluster receiver selector.", x);
             } finally {
                 selector = null;
             }
         }
+    }
+
+    private void closeSelector() throws IOException {
+        Selector selector = this.selector;
+        this.selector = null;
+        if (selector==null) return;
+        try {
+            Iterator it = selector.keys().iterator();
+            // look at each key in the selected set
+            while (it.hasNext()) {
+                SelectionKey key = (SelectionKey)it.next();
+                key.channel().close();
+                key.attach(null);
+                key.cancel();
+            }
+        }catch ( IOException ignore ){
+            if (log.isWarnEnabled()) {
+                log.warn("Unable to cleanup on selector close.",ignore);
+            }
+        }catch ( ClosedSelectorException ignore){}
+        selector.close();
     }
 
     // ----------------------------------------------------------

Modified: 
tomcat/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java?view=diff&rev=552560&r1=552559&r2=552560
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java 
(original)
+++ 
tomcat/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java 
Mon Jul  2 11:46:20 2007
@@ -80,6 +80,45 @@
         channels[0].send(new Member[]{impl},new TestMsg(),0);
     }
     
+
+    public void testSendToRemote() throws Exception {
+        ReplicationTransmitter transmitter = (ReplicationTransmitter) 
channels[0].getChannelSender();
+        AbstractSender sender = (AbstractSender)transmitter.getTransport();
+        sender.setMaxRetryAttempts(0);
+        sender.setTimeout(60000);
+        MemberImpl impl = new MemberImpl("127.0.0.1",9999,1000,new 
byte[]{1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8});
+        for (int i=0; i<1000; i++) {
+            if (i%100==0) System.out.println("Sending message:"+(i+1));
+            channels[0].send(new Member[] {impl}, new TestMsg(), 0);
+        }
+    }
+
+
+    public void testSendToFailing() throws Exception {
+        ReplicationTransmitter transmitter = (ReplicationTransmitter) 
channels[0].getChannelSender();
+        AbstractSender sender = (AbstractSender)transmitter.getTransport();
+        sender.setMaxRetryAttempts(0);
+        sender.setTimeout(60000);
+        Member[] ma = channels[0].getMembers();
+        final Member m = channels[1].getLocalMember(true);
+        Thread st = new Thread() {
+            public void run() {
+                try {
+                    for (int i=0; i<10000; i++ ) { 
+                        channels[0].send(new Member[] {m}, new TestMsg(), 0);
+                    }
+                } catch (Exception x) {
+                    x.printStackTrace();
+                }
+            }
+        };
+        st.start();
+        Thread.sleep(250);
+        channels[1].stop(Channel.DEFAULT);
+        st.join();
+    }
+
+
     public void testKeepAliveCount() throws Exception {
         System.out.println("Setting keep alive count to 0");
         for (int i = 0; i < channels.length; i++) {



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to