Author: fhanik Date: Wed Feb 15 13:13:45 2006 New Revision: 378093 URL: http://svn.apache.org/viewcvs?rev=378093&view=rev Log: Slowly taking a little shape, getting ready to make the messaging a completely separate component, that if correctly done should be able to be used elsewhere.
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java Wed Feb 15 13:13:45 2006 @@ -66,7 +66,7 @@ * @param options int - sender options, see class documentation * @return ClusterMessage[] - the replies from the members, if any. */ - public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options); + public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) throws ChannelException; public void setClusterSender(ClusterSender sender); Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java Wed Feb 15 13:13:45 2006 @@ -102,7 +102,7 @@ * If you call this method twice, the last listener will be used. * @param listener The listener */ - public void addMembershipListener(MembershipListener listener); + public void setMembershipListener(MembershipListener listener); /** * removes the membership listener. Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java Wed Feb 15 13:13:45 2006 @@ -15,6 +15,15 @@ */ package org.apache.catalina.cluster.group; +import org.apache.catalina.cluster.MembershipService; +import org.apache.catalina.cluster.Member; +import org.apache.catalina.cluster.ClusterMessage; +import org.apache.catalina.cluster.ChannelException; +import org.apache.catalina.cluster.ClusterSender; +import org.apache.catalina.cluster.ClusterReceiver; +import org.apache.catalina.cluster.ClusterChannel; + + /** * The channel coordinator object coordinates the membership service, * the sender and the receiver. @@ -23,8 +32,95 @@ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ public class ChannelCoordinator extends ChannelInterceptorBase { - public ChannelCoordinator() { - super(); + private ClusterReceiver clusterReceiver; + private ClusterSender clusterSender; + private MembershipService membershipService; + + + + /** + * Send a message to one or more members in the cluster + * @param destination Member[] - the destinations, null or zero length means all + * @param msg ClusterMessage - the message to send + * @param options int - sender options, see class documentation + * @return ClusterMessage[] - the replies from the members, if any. + */ + public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) { + throw new UnsupportedOperationException(); + //implement sending and receiving logic. + } + + + /** + * Starts up the channel. This can be called multiple times for individual services to start + * The svc parameter can be the logical or value of any constants + * @param svc int value of <BR> + * DEFAULT - will start all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * @throws ChannelException if a startup error occurs or the service is already started. + */ + public void start(int svc) throws ChannelException { + try { + if ( (svc & ClusterChannel.MBR_RX_SEQ) == ClusterChannel.MBR_RX_SEQ) membershipService.start(membershipService.MBR_RX); + if ( (svc & ClusterChannel.SND_RX_SEQ) == ClusterChannel.SND_RX_SEQ) clusterReceiver.start(); + if ( (svc & ClusterChannel.SND_TX_SEQ) == ClusterChannel.SND_TX_SEQ) clusterSender.start(); + if ( (svc & ClusterChannel.MBR_TX_SEQ) == ClusterChannel.MBR_TX_SEQ) membershipService.start(membershipService.MBR_TX); + }catch ( ChannelException cx ) { + throw cx; + }catch ( Exception x ) { + throw new ChannelException(x); + } + } + + /** + * Shuts down the channel. This can be called multiple times for individual services to shutdown + * The svc parameter can be the logical or value of any constants + * @param svc int value of <BR> + * DEFAULT - will shutdown all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * @throws ChannelException if a startup error occurs or the service is already started. + */ + public void stop(int svc) throws ChannelException { + try { + if ( (svc & ClusterChannel.MBR_RX_SEQ) == ClusterChannel.MBR_RX_SEQ) membershipService.stop(); + if ( (svc & ClusterChannel.SND_RX_SEQ) == ClusterChannel.SND_RX_SEQ) clusterReceiver.stop(); + if ( (svc & ClusterChannel.SND_TX_SEQ) == ClusterChannel.SND_TX_SEQ) clusterSender.stop(); + if ( (svc & ClusterChannel.MBR_TX_SEQ) == ClusterChannel.MBR_RX_SEQ) membershipService.stop(); + }catch ( Exception x ) { + throw new ChannelException(x); + } + + } + + public ClusterReceiver getClusterReceiver() { + return clusterReceiver; + } + + public ClusterSender getClusterSender() { + return clusterSender; + } + + public MembershipService getMembershipService() { + return membershipService; + } + + public void setClusterReceiver(ClusterReceiver clusterReceiver) { + this.clusterReceiver = clusterReceiver; + } + + public void setClusterSender(ClusterSender clusterSender) { + this.clusterSender = clusterSender; + } + + public void setMembershipService(MembershipService membershipService) { + this.membershipService = membershipService; + this.membershipService.setMembershipListener(this); } } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java Wed Feb 15 13:13:45 2006 @@ -66,12 +66,12 @@ public void memberAdded(Member member) { //notify upwards - getPrevious().memberAdded(member); + if ( getPrevious()!=null ) getPrevious().memberAdded(member); } public void memberDisappeared(Member member) { //notify upwards - getPrevious().memberDisappeared(member); + if ( getPrevious()!=null ) getPrevious().memberDisappeared(member); } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java Wed Feb 15 13:13:45 2006 @@ -23,6 +23,7 @@ import org.apache.catalina.cluster.MembershipService; import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.Member; +import java.util.ArrayList; /** * Channel interface @@ -32,13 +33,36 @@ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ public class GroupChannel implements ClusterChannel { - private ClusterReceiver clusterReceiver; - private ClusterSender clusterSender; - private MembershipService membershipService; + private ChannelCoordinator coordinator = new ChannelCoordinator(); + private ChannelInterceptorBase interceptors = null; public GroupChannel() { } + + /** + * Adds an interceptor to the stack for message processing + * @param interceptor ChannelInterceptorBase + */ + public void addInterceptor(ChannelInterceptorBase interceptor) { + if ( interceptors == null ) { + this.interceptors = interceptor; + this.interceptors.setNext(coordinator); + coordinator.setPrevious(this.interceptors); + } else { + ChannelInterceptorBase last = interceptors; + while ( last.getNext() != coordinator ) { + last = last.getNext(); + } + last.setNext(interceptor); + interceptor.setNext(coordinator); + interceptor.setPrevious(last); + coordinator.setPrevious(interceptor); + } + } + + + /** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, null or zero length means all @@ -46,7 +70,7 @@ * @param options int - sender options, see class documentation * @return ClusterMessage[] - the replies from the members, if any. */ - public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) { + public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) throws ChannelException { throw new UnsupportedOperationException("Method send not yet implemented."); } @@ -62,14 +86,7 @@ * @throws ChannelException if a startup error occurs or the service is already started. */ public void start(int svc) throws ChannelException { - try { - if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) membershipService.start(membershipService.MBR_RX); - if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.start(); - if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.start(); - if ( (svc & MBR_TX_SEQ) == MBR_TX_SEQ) membershipService.start(membershipService.MBR_TX); - }catch ( Exception x ) { - throw new ChannelException(x); - } + coordinator.start(svc); } /** @@ -84,39 +101,31 @@ * @throws ChannelException if a startup error occurs or the service is already started. */ public void stop(int svc) throws ChannelException { - try { - if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) membershipService.stop(); - if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.stop(); - if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.stop(); - if ( (svc & MBR_TX_SEQ) == MBR_RX_SEQ) membershipService.stop(); - }catch ( Exception x ) { - throw new ChannelException(x); - } - + coordinator.stop(svc); } public ClusterReceiver getClusterReceiver() { - return clusterReceiver; + return coordinator.getClusterReceiver(); } public ClusterSender getClusterSender() { - return clusterSender; + return coordinator.getClusterSender(); } public MembershipService getMembershipService() { - return membershipService; + return coordinator.getMembershipService(); } public void setClusterReceiver(ClusterReceiver clusterReceiver) { - this.clusterReceiver = clusterReceiver; + coordinator.setClusterReceiver(clusterReceiver); } public void setClusterSender(ClusterSender clusterSender) { - this.clusterSender = clusterSender; + coordinator.setClusterSender(clusterSender); } public void setMembershipService(MembershipService membershipService) { - this.membershipService = membershipService; + coordinator.setMembershipService(membershipService); } } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java Wed Feb 15 13:13:45 2006 @@ -440,7 +440,7 @@ * so calling this method twice will result in only the second listener being active. * @param listener The listener */ - public void addMembershipListener(MembershipListener listener) { + public void setMembershipListener(MembershipListener listener) { this.listener = listener; } /** Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java?rev=378093&r1=378092&r2=378093&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Wed Feb 15 13:13:45 2006 @@ -761,9 +761,8 @@ } if(membershipService != null && clusterReceiver != null) { - membershipService.setLocalMemberProperties(clusterReceiver - .getHost(), clusterReceiver.getPort()); - membershipService.addMembershipListener(this); + membershipService.setLocalMemberProperties(clusterReceiver.getHost(), clusterReceiver.getPort()); + membershipService.setMembershipListener(this); membershipService.setCatalinaCluster(this); membershipService.start(); // start the deployer. --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]