Author: fhanik
Date: Tue Mar  7 07:40:30 2006
New Revision: 383911

URL: http://svn.apache.org/viewcvs?rev=383911&view=rev
Log:
refactored to allow more than one listener.
Membership is retrievable throughout the interceptor stack, so that it is 
available to all interceptors.

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
    
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
 Tue Mar  7 07:40:30 2006
@@ -82,8 +82,11 @@
      */
     public void heartbeat();
     
-    public void setMembershipListener(MembershipListener listener);
-    public void setChannelListener(ChannelListener listener);
+    public void addMembershipListener(MembershipListener listener);
+    public void addChannelListener(ChannelListener listener);
+
+    public void removeMembershipListener(MembershipListener listener);
+    public void removeChannelListener(ChannelListener listener);
     
     /**
      * has members

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
 Tue Mar  7 07:40:30 2006
@@ -43,4 +43,30 @@
     public void messageReceived(ChannelMessage data);
     
     public void heartbeat();
+    
+    /**
+     * has members
+     */
+    public boolean hasMembers() ;
+
+    /**
+     * Get all current cluster members
+     * @return all members or empty array 
+     */
+    public Member[] getMembers() ;
+
+    /**
+     * Return the member that represents this node.
+     * 
+     * @return Member
+     */
+    public Member getLocalMember() ;
+
+    /**
+     * 
+     * @param mbr Member
+     * @return Member
+     */
+    public Member getMember(Member mbr);
+
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 Tue Mar  7 07:40:30 2006
@@ -164,5 +164,40 @@
         if ( clusterSender!=null ) clusterSender.heartbeat();
         super.heartbeat();
     }
+    
+    /**
+     * has members
+     */
+    public boolean hasMembers() {
+        return this.getMembershipService().hasMembers();
+    }
+
+    /**
+     * Get all current cluster members
+     * @return all members or empty array
+     */
+    public Member[] getMembers() {
+        return this.getMembershipService().getMembers();
+    }
+
+    /**
+     * 
+     * @param mbr Member
+     * @return Member
+     */
+    public Member getMember(Member mbr){
+        return this.getMembershipService().getMember(mbr);
+    }
+
+
+    /**
+     * Return the member that represents this node.
+     *
+     * @return Member
+     */
+    public Member getLocalMember() {
+        return this.getMembershipService().getLocalMember();
+    }
+
    
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
 Tue Mar  7 07:40:30 2006
@@ -1,12 +1,12 @@
 /*
  * Copyright 1999,2004-2006 The Apache Software Foundation.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -31,25 +31,27 @@
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
  */
 
-public abstract class ChannelInterceptorBase implements ChannelInterceptor{
-    
-    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(ChannelInterceptorBase.class);
-    
+public abstract class ChannelInterceptorBase
+    implements ChannelInterceptor {
+
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(
+        ChannelInterceptorBase.class);
+
     private ChannelInterceptor next;
     private ChannelInterceptor previous;
-    
+
     public ChannelInterceptorBase() {
-        
+
     }
-    
+
     public final void setNext(ChannelInterceptor next) {
         this.next = next;
     }
-    
+
     public final ChannelInterceptor getNext() {
         return next;
     }
-    
+
     public final void setPrevious(ChannelInterceptor previous) {
         this.previous = previous;
     }
@@ -58,38 +60,68 @@
         return previous;
     }
 
-    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
-        if ( getNext() != null ) getNext().sendMessage(destination, msg, 
payload);
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws
+        ChannelException {
+        if (getNext() != null) getNext().sendMessage(destination, msg, 
payload);
     }
-    
+
     public void messageReceived(ChannelMessage msg) {
-        if ( getPrevious() != null ) getPrevious().messageReceived(msg);
+        if (getPrevious() != null) getPrevious().messageReceived(msg);
     }
 
     public boolean accept(ChannelMessage msg) {
         return true;
     }
 
-    
     public void memberAdded(Member member) {
         //notify upwards
-        if ( getPrevious()!=null ) getPrevious().memberAdded(member);
+        if (getPrevious() != null) getPrevious().memberAdded(member);
     }
-    
+
     public void memberDisappeared(Member member) {
         //notify upwards
-        if ( getPrevious()!=null ) getPrevious().memberDisappeared(member);
+        if (getPrevious() != null) getPrevious().memberDisappeared(member);
     }
-    
-    
 
     public void heartbeat() {
-        if ( getNext() != null ) getNext().heartbeat();
+        if (getNext() != null) getNext().heartbeat();
     }
-    
 
-    
-    
-    
+    /**
+     * has members
+     */
+    public boolean hasMembers() {
+        if ( getNext()!=null )return getNext().hasMembers();
+        else return false;
+    }
+
+    /**
+     * Get all current cluster members
+     * @return all members or empty array
+     */
+    public Member[] getMembers() {
+        if ( getNext()!=null ) return getNext().getMembers();
+        else return null;
+    }
+
+    /**
+     *
+     * @param mbr Member
+     * @return Member
+     */
+    public Member getMember(Member mbr) {
+        if ( getNext()!=null) return getNext().getMember(mbr);
+        else return null;
+    }
+
+    /**
+     * Return the member that represents this node.
+     *
+     * @return Member
+     */
+    public Member getLocalMember() {
+        if ( getNext()!=null ) return getNext().getLocalMember();
+        else return null;
+    }
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Tue Mar  7 07:40:30 2006
@@ -32,6 +32,7 @@
 import org.apache.catalina.tribes.MembershipService;
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.ArrayList;
 
 /**
  * The GroupChannel manages the replication channel. It coordinates
@@ -46,8 +47,9 @@
     
     private ChannelCoordinator coordinator = new ChannelCoordinator();
     private ChannelInterceptor interceptors = null;
-    private MembershipListener membershipListener;
-    private ChannelListener channelListener;
+    
+    private ArrayList membershipListeners = new ArrayList();
+    private ArrayList channelListeners = new ArrayList();
 
     public GroupChannel() {
         addInterceptor(this);
@@ -126,8 +128,12 @@
             }
             //get the actual member with the correct alive time
             Member source = msg.getAddress();
-            if ( channelListener != null && 
channelListener.accept(fwd,source)) 
-                channelListener.messageReceived(fwd,source);
+            
+            for ( int i=0; i<channelListeners.size(); i++ ) {
+                ChannelListener channelListener = 
(ChannelListener)channelListeners.get(i);
+                if (channelListener != null && channelListener.accept(fwd, 
source))
+                    channelListener.messageReceived(fwd, source);
+            }//for
         }catch ( Exception x ) {
             log.error("Unable to deserialize channel message.",x);
         }
@@ -135,12 +141,18 @@
     
     public void memberAdded(Member member) {
         //notify upwards
-        if (membershipListener != null) membershipListener.memberAdded(member);
+        for (int i=0; i<membershipListeners.size(); i++ ) {
+            MembershipListener membershipListener = 
(MembershipListener)membershipListeners.get(i);
+            if (membershipListener != null) 
membershipListener.memberAdded(member);
+        }
     }
     
     public void memberDisappeared(Member member) {
         //notify upwards
-        if (membershipListener != null) 
membershipListener.memberDisappeared(member);
+        for (int i=0; i<membershipListeners.size(); i++ ) {
+            MembershipListener membershipListener = 
(MembershipListener)membershipListeners.get(i);
+            if (membershipListener != null) 
membershipListener.memberDisappeared(member);
+        }
     }    
     
     public ChannelInterceptor getFirstInterceptor() {
@@ -202,62 +214,29 @@
         coordinator.setMembershipService(membershipService);
     }
 
-    public void setMembershipListener(MembershipListener membershipListener) {
-        this.membershipListener = membershipListener;
+    public void addMembershipListener(MembershipListener membershipListener) {
+        if (!this.membershipListeners.contains(membershipListener) )
+            this.membershipListeners.add(membershipListener);
     }
 
-    public void setChannelListener(ChannelListener channelListener) {
-
-        this.channelListener = channelListener;
+    public void removeMembershipListener(MembershipListener 
membershipListener) {
+        membershipListeners.remove(membershipListener);
     }
 
-    public MembershipListener getMembershipListener() {
-        return membershipListener;
+    public void addChannelListener(ChannelListener channelListener) {
+        if (!this.channelListeners.contains(channelListener) )
+            this.channelListeners.add(channelListener);
     }
     
-    public Iterator getInterceptors() { 
-        return new InterceptorIterator(this.getNext(),this.coordinator);
-    }
-
-    public ChannelListener getChannelListener() {
-
-        return channelListener;
-    }
-
-    /**
-     * has members
-     */
-    public boolean hasMembers() {
-        return coordinator.getMembershipService().hasMembers();
+    public void removeChannelListener(ChannelListener channelListener) {
+        channelListeners.remove(channelListener);
     }
 
-    /**
-     * Get all current cluster members
-     * @return all members or empty array
-     */
-    public Member[] getMembers() {
-        return coordinator.getMembershipService().getMembers();
-    }
-    
-    /**
-     * 
-     * @param mbr Member
-     * @return Member
-     */
-    public Member getMember(Member mbr){
-        return coordinator.getMembershipService().getMember(mbr);
+    public Iterator getInterceptors() { 
+        return new InterceptorIterator(this.getNext(),this.coordinator);
     }
 
 
-    /**
-     * Return the member that represents this node.
-     *
-     * @return Member
-     */
-    public Member getLocalMember() {
-        return coordinator.getMembershipService().getLocalMember();
-    }
-    
     public static class InterceptorIterator implements Iterator {
         private ChannelInterceptor end;
         private ChannelInterceptor start;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
 Tue Mar  7 07:40:30 2006
@@ -212,7 +212,9 @@
                 socketChannel = null;
             }
         } catch ( Exception x ) {
-            log.error("Unable to disconnect.",x);
+            log.error("Unable to disconnect. msg="+x.getMessage());
+            if ( log.isDebugEnabled() ) 
+                log.debug("Unable to disconnect. msg="+x.getMessage(),x);
         } finally {
             reset();
         }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
 Tue Mar  7 07:40:30 2006
@@ -360,8 +360,8 @@
         LoadMessage msg = new LoadMessage();
         
         messageSize = LoadMessage.getMessageSize(msg);
-        channel.setChannelListener(test);
-        channel.setMembershipListener(test);
+        channel.addChannelListener(test);
+        channel.addMembershipListener(test);
         channel.start(channel.DEFAULT);
         Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
         while ( threads > 1 ) {

Modified: 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
 Tue Mar  7 07:40:30 2006
@@ -652,8 +652,8 @@
         try {
             if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
             this.registerClusterValve();
-            channel.setMembershipListener(this);
-            channel.setChannelListener(this);
+            channel.addMembershipListener(this);
+            channel.addChannelListener(this);
             channel.start(channel.DEFAULT);
             if (clusterDeployer != null) clusterDeployer.start();
             this.started = true;
@@ -732,6 +732,8 @@
         try {
             if ( clusterDeployer != null ) clusterDeployer.setCluster(null);
             channel.stop(Channel.DEFAULT);
+            channel.removeChannelListener(this);
+            channel.removeMembershipListener(this);
             this.unregisterClusterValve();
         } catch (Exception x) {
             log.error("Unable to stop cluster valve.", x);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to