Author: fhanik Date: Tue Mar 7 07:40:30 2006 New Revision: 383911 URL: http://svn.apache.org/viewcvs?rev=383911&view=rev Log: refactored to allow more than one listener. Membership is retrievable throughout the interceptor stack, so that it is available to all interceptors.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Tue Mar 7 07:40:30 2006 @@ -82,8 +82,11 @@ */ public void heartbeat(); - public void setMembershipListener(MembershipListener listener); - public void setChannelListener(ChannelListener listener); + public void addMembershipListener(MembershipListener listener); + public void addChannelListener(ChannelListener listener); + + public void removeMembershipListener(MembershipListener listener); + public void removeChannelListener(ChannelListener listener); /** * has members Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java Tue Mar 7 07:40:30 2006 @@ -43,4 +43,30 @@ public void messageReceived(ChannelMessage data); public void heartbeat(); + + /** + * has members + */ + public boolean hasMembers() ; + + /** + * Get all current cluster members + * @return all members or empty array + */ + public Member[] getMembers() ; + + /** + * Return the member that represents this node. + * + * @return Member + */ + public Member getLocalMember() ; + + /** + * + * @param mbr Member + * @return Member + */ + public Member getMember(Member mbr); + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Tue Mar 7 07:40:30 2006 @@ -164,5 +164,40 @@ if ( clusterSender!=null ) clusterSender.heartbeat(); super.heartbeat(); } + + /** + * has members + */ + public boolean hasMembers() { + return this.getMembershipService().hasMembers(); + } + + /** + * Get all current cluster members + * @return all members or empty array + */ + public Member[] getMembers() { + return this.getMembershipService().getMembers(); + } + + /** + * + * @param mbr Member + * @return Member + */ + public Member getMember(Member mbr){ + return this.getMembershipService().getMember(mbr); + } + + + /** + * Return the member that represents this node. + * + * @return Member + */ + public Member getLocalMember() { + return this.getMembershipService().getLocalMember(); + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java Tue Mar 7 07:40:30 2006 @@ -1,12 +1,12 @@ /* * Copyright 1999,2004-2006 The Apache Software Foundation. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,25 +31,27 @@ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ -public abstract class ChannelInterceptorBase implements ChannelInterceptor{ - - protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ChannelInterceptorBase.class); - +public abstract class ChannelInterceptorBase + implements ChannelInterceptor { + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( + ChannelInterceptorBase.class); + private ChannelInterceptor next; private ChannelInterceptor previous; - + public ChannelInterceptorBase() { - + } - + public final void setNext(ChannelInterceptor next) { this.next = next; } - + public final ChannelInterceptor getNext() { return next; } - + public final void setPrevious(ChannelInterceptor previous) { this.previous = previous; } @@ -58,38 +60,68 @@ return previous; } - public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { - if ( getNext() != null ) getNext().sendMessage(destination, msg, payload); + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws + ChannelException { + if (getNext() != null) getNext().sendMessage(destination, msg, payload); } - + public void messageReceived(ChannelMessage msg) { - if ( getPrevious() != null ) getPrevious().messageReceived(msg); + if (getPrevious() != null) getPrevious().messageReceived(msg); } public boolean accept(ChannelMessage msg) { return true; } - public void memberAdded(Member member) { //notify upwards - if ( getPrevious()!=null ) getPrevious().memberAdded(member); + if (getPrevious() != null) getPrevious().memberAdded(member); } - + public void memberDisappeared(Member member) { //notify upwards - if ( getPrevious()!=null ) getPrevious().memberDisappeared(member); + if (getPrevious() != null) getPrevious().memberDisappeared(member); } - - public void heartbeat() { - if ( getNext() != null ) getNext().heartbeat(); + if (getNext() != null) getNext().heartbeat(); } - - - - + /** + * has members + */ + public boolean hasMembers() { + if ( getNext()!=null )return getNext().hasMembers(); + else return false; + } + + /** + * Get all current cluster members + * @return all members or empty array + */ + public Member[] getMembers() { + if ( getNext()!=null ) return getNext().getMembers(); + else return null; + } + + /** + * + * @param mbr Member + * @return Member + */ + public Member getMember(Member mbr) { + if ( getNext()!=null) return getNext().getMember(mbr); + else return null; + } + + /** + * Return the member that represents this node. + * + * @return Member + */ + public Member getLocalMember() { + if ( getNext()!=null ) return getNext().getLocalMember(); + else return null; + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Tue Mar 7 07:40:30 2006 @@ -32,6 +32,7 @@ import org.apache.catalina.tribes.MembershipService; import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; +import java.util.ArrayList; /** * The GroupChannel manages the replication channel. It coordinates @@ -46,8 +47,9 @@ private ChannelCoordinator coordinator = new ChannelCoordinator(); private ChannelInterceptor interceptors = null; - private MembershipListener membershipListener; - private ChannelListener channelListener; + + private ArrayList membershipListeners = new ArrayList(); + private ArrayList channelListeners = new ArrayList(); public GroupChannel() { addInterceptor(this); @@ -126,8 +128,12 @@ } //get the actual member with the correct alive time Member source = msg.getAddress(); - if ( channelListener != null && channelListener.accept(fwd,source)) - channelListener.messageReceived(fwd,source); + + for ( int i=0; i<channelListeners.size(); i++ ) { + ChannelListener channelListener = (ChannelListener)channelListeners.get(i); + if (channelListener != null && channelListener.accept(fwd, source)) + channelListener.messageReceived(fwd, source); + }//for }catch ( Exception x ) { log.error("Unable to deserialize channel message.",x); } @@ -135,12 +141,18 @@ public void memberAdded(Member member) { //notify upwards - if (membershipListener != null) membershipListener.memberAdded(member); + for (int i=0; i<membershipListeners.size(); i++ ) { + MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i); + if (membershipListener != null) membershipListener.memberAdded(member); + } } public void memberDisappeared(Member member) { //notify upwards - if (membershipListener != null) membershipListener.memberDisappeared(member); + for (int i=0; i<membershipListeners.size(); i++ ) { + MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i); + if (membershipListener != null) membershipListener.memberDisappeared(member); + } } public ChannelInterceptor getFirstInterceptor() { @@ -202,62 +214,29 @@ coordinator.setMembershipService(membershipService); } - public void setMembershipListener(MembershipListener membershipListener) { - this.membershipListener = membershipListener; + public void addMembershipListener(MembershipListener membershipListener) { + if (!this.membershipListeners.contains(membershipListener) ) + this.membershipListeners.add(membershipListener); } - public void setChannelListener(ChannelListener channelListener) { - - this.channelListener = channelListener; + public void removeMembershipListener(MembershipListener membershipListener) { + membershipListeners.remove(membershipListener); } - public MembershipListener getMembershipListener() { - return membershipListener; + public void addChannelListener(ChannelListener channelListener) { + if (!this.channelListeners.contains(channelListener) ) + this.channelListeners.add(channelListener); } - public Iterator getInterceptors() { - return new InterceptorIterator(this.getNext(),this.coordinator); - } - - public ChannelListener getChannelListener() { - - return channelListener; - } - - /** - * has members - */ - public boolean hasMembers() { - return coordinator.getMembershipService().hasMembers(); + public void removeChannelListener(ChannelListener channelListener) { + channelListeners.remove(channelListener); } - /** - * Get all current cluster members - * @return all members or empty array - */ - public Member[] getMembers() { - return coordinator.getMembershipService().getMembers(); - } - - /** - * - * @param mbr Member - * @return Member - */ - public Member getMember(Member mbr){ - return coordinator.getMembershipService().getMember(mbr); + public Iterator getInterceptors() { + return new InterceptorIterator(this.getNext(),this.coordinator); } - /** - * Return the member that represents this node. - * - * @return Member - */ - public Member getLocalMember() { - return coordinator.getMembershipService().getLocalMember(); - } - public static class InterceptorIterator implements Iterator { private ChannelInterceptor end; private ChannelInterceptor start; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Tue Mar 7 07:40:30 2006 @@ -212,7 +212,9 @@ socketChannel = null; } } catch ( Exception x ) { - log.error("Unable to disconnect.",x); + log.error("Unable to disconnect. msg="+x.getMessage()); + if ( log.isDebugEnabled() ) + log.debug("Unable to disconnect. msg="+x.getMessage(),x); } finally { reset(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Tue Mar 7 07:40:30 2006 @@ -360,8 +360,8 @@ LoadMessage msg = new LoadMessage(); messageSize = LoadMessage.getMessageSize(msg); - channel.setChannelListener(test); - channel.setMembershipListener(test); + channel.addChannelListener(test); + channel.addMembershipListener(test); channel.start(channel.DEFAULT); Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); while ( threads > 1 ) { Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=383911&r1=383910&r2=383911&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Tue Mar 7 07:40:30 2006 @@ -652,8 +652,8 @@ try { if ( clusterDeployer != null ) clusterDeployer.setCluster(this); this.registerClusterValve(); - channel.setMembershipListener(this); - channel.setChannelListener(this); + channel.addMembershipListener(this); + channel.addChannelListener(this); channel.start(channel.DEFAULT); if (clusterDeployer != null) clusterDeployer.start(); this.started = true; @@ -732,6 +732,8 @@ try { if ( clusterDeployer != null ) clusterDeployer.setCluster(null); channel.stop(Channel.DEFAULT); + channel.removeChannelListener(this); + channel.removeMembershipListener(this); this.unregisterClusterValve(); } catch (Exception x) { log.error("Unable to stop cluster valve.", x); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]