Author: kfujino
Date: Tue Dec 15 06:47:04 2015
New Revision: 1720077

URL: http://svn.apache.org/viewvc?rev=1720077&view=rev
Log:
Add support for the startup notification of local members in the static cluster.

Modified:
    
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=1720077&r1=1720076&r2=1720077&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
 Tue Dec 15 06:47:04 2015
@@ -17,13 +17,17 @@
 package org.apache.catalina.tribes.group.interceptors;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.AbsoluteOrder;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 
@@ -31,6 +35,10 @@ public class StaticMembershipInterceptor
 
     private static final Log log = 
LogFactory.getLog(StaticMembershipInterceptor.class);
 
+    protected static final byte[] MEMBER_START = new byte[] {
+        76, 111, 99, 97, 108, 32, 83, 116, 97, 116, 105, 99, 77, 101, 109, 98, 
101, 114, 32, 78,
+        111, 116, 105, 102, 105, 99, 97, 116, 105, 111, 110, 32, 68, 97, 116, 
97};
+
     protected ArrayList<Member> members = new ArrayList<Member>();
     protected Member localMember = null;
 
@@ -54,6 +62,21 @@ public class StaticMembershipInterceptor
         this.localMember = member;
     }
 
+    @Override
+    public void messageReceived(ChannelMessage msg) {
+        if (msg.getMessage().getLength() == MEMBER_START.length &&
+                Arrays.equals(MEMBER_START, msg.getMessage().getBytes())) {
+            // receive member start
+            Member member = getMember(msg.getAddress());
+            if (member != null) {
+                super.memberAdded(member);
+            }
+            
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
     /**
      * has members
      */
@@ -112,17 +135,19 @@ public class StaticMembershipInterceptor
     public void start(int svc) throws ChannelException {
         if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) 
super.start(Channel.SND_RX_SEQ); 
         if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) 
super.start(Channel.SND_TX_SEQ); 
-        final Member[] mbrs = members.toArray(new Member[members.size()]);
         final ChannelInterceptorBase base = this;
-        Thread t = new Thread() {
-            @Override
-            public void run() {
-                for (int i=0; i<mbrs.length; i++ ) {
-                    base.memberAdded(mbrs[i]);
+        for (final Member member : members) {
+            Thread t = new Thread() {
+                @Override
+                public void run() {
+                    base.memberAdded(member);
+                    if (getfirstInterceptor().getMember(member) != null) {
+                        sendLocalMember(new Member[]{member});
+                    }
                 }
-            }
-        };
-        t.start();
+            };
+            t.start();
+        }
         super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ));
 
         // check required interceptors
@@ -146,4 +171,28 @@ public class StaticMembershipInterceptor
         }
     }
 
+    protected void sendLocalMember(Member[] members) {
+        if ( members == null || members.length == 0 ) return;
+        ChannelData data = new ChannelData(true);
+        data.setAddress(getLocalMember(false));
+        data.setTimestamp(System.currentTimeMillis());
+        data.setOptions(getOptionFlag());
+        data.setMessage(new XByteBuffer(MEMBER_START, false));
+        try {
+            super.sendMessage(members, data, null);
+        }catch (ChannelException cx) {
+            log.warn("Local member notification failed.",cx);
+        }
+    }
+
+    protected ChannelInterceptor getfirstInterceptor() {
+        ChannelInterceptor result = null;
+        ChannelInterceptor now = this;
+        do {
+            result = now;
+            now = now.getPrevious();
+        } while (now.getPrevious() != null);
+        return result;
+    }
+
 }
\ No newline at end of file

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1720077&r1=1720076&r2=1720077&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Tue Dec 15 06:47:04 2015
@@ -81,6 +81,14 @@
       </fix>
     </changelog>
   </subsection>
+  <subsection name="Tribes">
+    <changelog>
+      <fix>
+        Add support for the startup notification of local members in the static
+        cluster. (kfujino)
+      </fix>
+    </changelog>
+  </subsection>
 </section>
 <section name="Tomcat 7.0.67 (violetagg)" rtext="released 2015-12-10">
   <subsection name="Catalina">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to