Author: fhanik
Date: Tue Mar 7 07:40:30 2006
New Revision: 383911
URL: http://svn.apache.org/viewcvs?rev=383911&view=rev
Log:
refactored to allow more than one listener.
Membership is retrievable throughout the interceptor stack, so that it is
available to all interceptors.
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
Tue Mar 7 07:40:30 2006
@@ -82,8 +82,11 @@
*/
public void heartbeat();
- public void setMembershipListener(MembershipListener listener);
- public void setChannelListener(ChannelListener listener);
+ public void addMembershipListener(MembershipListener listener);
+ public void addChannelListener(ChannelListener listener);
+
+ public void removeMembershipListener(MembershipListener listener);
+ public void removeChannelListener(ChannelListener listener);
/**
* has members
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
Tue Mar 7 07:40:30 2006
@@ -43,4 +43,30 @@
public void messageReceived(ChannelMessage data);
public void heartbeat();
+
+ /**
+ * has members
+ */
+ public boolean hasMembers() ;
+
+ /**
+ * Get all current cluster members
+ * @return all members or empty array
+ */
+ public Member[] getMembers() ;
+
+ /**
+ * Return the member that represents this node.
+ *
+ * @return Member
+ */
+ public Member getLocalMember() ;
+
+ /**
+ *
+ * @param mbr Member
+ * @return Member
+ */
+ public Member getMember(Member mbr);
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
Tue Mar 7 07:40:30 2006
@@ -164,5 +164,40 @@
if ( clusterSender!=null ) clusterSender.heartbeat();
super.heartbeat();
}
+
+ /**
+ * has members
+ */
+ public boolean hasMembers() {
+ return this.getMembershipService().hasMembers();
+ }
+
+ /**
+ * Get all current cluster members
+ * @return all members or empty array
+ */
+ public Member[] getMembers() {
+ return this.getMembershipService().getMembers();
+ }
+
+ /**
+ *
+ * @param mbr Member
+ * @return Member
+ */
+ public Member getMember(Member mbr){
+ return this.getMembershipService().getMember(mbr);
+ }
+
+
+ /**
+ * Return the member that represents this node.
+ *
+ * @return Member
+ */
+ public Member getLocalMember() {
+ return this.getMembershipService().getLocalMember();
+ }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
Tue Mar 7 07:40:30 2006
@@ -1,12 +1,12 @@
/*
* Copyright 1999,2004-2006 The Apache Software Foundation.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -31,25 +31,27 @@
* @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
*/
-public abstract class ChannelInterceptorBase implements ChannelInterceptor{
-
- protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(ChannelInterceptorBase.class);
-
+public abstract class ChannelInterceptorBase
+ implements ChannelInterceptor {
+
+ protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(
+ ChannelInterceptorBase.class);
+
private ChannelInterceptor next;
private ChannelInterceptor previous;
-
+
public ChannelInterceptorBase() {
-
+
}
-
+
public final void setNext(ChannelInterceptor next) {
this.next = next;
}
-
+
public final ChannelInterceptor getNext() {
return next;
}
-
+
public final void setPrevious(ChannelInterceptor previous) {
this.previous = previous;
}
@@ -58,38 +60,68 @@
return previous;
}
- public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
- if ( getNext() != null ) getNext().sendMessage(destination, msg,
payload);
+ public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws
+ ChannelException {
+ if (getNext() != null) getNext().sendMessage(destination, msg,
payload);
}
-
+
public void messageReceived(ChannelMessage msg) {
- if ( getPrevious() != null ) getPrevious().messageReceived(msg);
+ if (getPrevious() != null) getPrevious().messageReceived(msg);
}
public boolean accept(ChannelMessage msg) {
return true;
}
-
public void memberAdded(Member member) {
//notify upwards
- if ( getPrevious()!=null ) getPrevious().memberAdded(member);
+ if (getPrevious() != null) getPrevious().memberAdded(member);
}
-
+
public void memberDisappeared(Member member) {
//notify upwards
- if ( getPrevious()!=null ) getPrevious().memberDisappeared(member);
+ if (getPrevious() != null) getPrevious().memberDisappeared(member);
}
-
-
public void heartbeat() {
- if ( getNext() != null ) getNext().heartbeat();
+ if (getNext() != null) getNext().heartbeat();
}
-
-
-
-
+ /**
+ * has members
+ */
+ public boolean hasMembers() {
+ if ( getNext()!=null )return getNext().hasMembers();
+ else return false;
+ }
+
+ /**
+ * Get all current cluster members
+ * @return all members or empty array
+ */
+ public Member[] getMembers() {
+ if ( getNext()!=null ) return getNext().getMembers();
+ else return null;
+ }
+
+ /**
+ *
+ * @param mbr Member
+ * @return Member
+ */
+ public Member getMember(Member mbr) {
+ if ( getNext()!=null) return getNext().getMember(mbr);
+ else return null;
+ }
+
+ /**
+ * Return the member that represents this node.
+ *
+ * @return Member
+ */
+ public Member getLocalMember() {
+ if ( getNext()!=null ) return getNext().getLocalMember();
+ else return null;
+ }
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Tue Mar 7 07:40:30 2006
@@ -32,6 +32,7 @@
import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.io.ClusterData;
import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.ArrayList;
/**
* The GroupChannel manages the replication channel. It coordinates
@@ -46,8 +47,9 @@
private ChannelCoordinator coordinator = new ChannelCoordinator();
private ChannelInterceptor interceptors = null;
- private MembershipListener membershipListener;
- private ChannelListener channelListener;
+
+ private ArrayList membershipListeners = new ArrayList();
+ private ArrayList channelListeners = new ArrayList();
public GroupChannel() {
addInterceptor(this);
@@ -126,8 +128,12 @@
}
//get the actual member with the correct alive time
Member source = msg.getAddress();
- if ( channelListener != null &&
channelListener.accept(fwd,source))
- channelListener.messageReceived(fwd,source);
+
+ for ( int i=0; i<channelListeners.size(); i++ ) {
+ ChannelListener channelListener =
(ChannelListener)channelListeners.get(i);
+ if (channelListener != null && channelListener.accept(fwd,
source))
+ channelListener.messageReceived(fwd, source);
+ }//for
}catch ( Exception x ) {
log.error("Unable to deserialize channel message.",x);
}
@@ -135,12 +141,18 @@
public void memberAdded(Member member) {
//notify upwards
- if (membershipListener != null) membershipListener.memberAdded(member);
+ for (int i=0; i<membershipListeners.size(); i++ ) {
+ MembershipListener membershipListener =
(MembershipListener)membershipListeners.get(i);
+ if (membershipListener != null)
membershipListener.memberAdded(member);
+ }
}
public void memberDisappeared(Member member) {
//notify upwards
- if (membershipListener != null)
membershipListener.memberDisappeared(member);
+ for (int i=0; i<membershipListeners.size(); i++ ) {
+ MembershipListener membershipListener =
(MembershipListener)membershipListeners.get(i);
+ if (membershipListener != null)
membershipListener.memberDisappeared(member);
+ }
}
public ChannelInterceptor getFirstInterceptor() {
@@ -202,62 +214,29 @@
coordinator.setMembershipService(membershipService);
}
- public void setMembershipListener(MembershipListener membershipListener) {
- this.membershipListener = membershipListener;
+ public void addMembershipListener(MembershipListener membershipListener) {
+ if (!this.membershipListeners.contains(membershipListener) )
+ this.membershipListeners.add(membershipListener);
}
- public void setChannelListener(ChannelListener channelListener) {
-
- this.channelListener = channelListener;
+ public void removeMembershipListener(MembershipListener
membershipListener) {
+ membershipListeners.remove(membershipListener);
}
- public MembershipListener getMembershipListener() {
- return membershipListener;
+ public void addChannelListener(ChannelListener channelListener) {
+ if (!this.channelListeners.contains(channelListener) )
+ this.channelListeners.add(channelListener);
}
- public Iterator getInterceptors() {
- return new InterceptorIterator(this.getNext(),this.coordinator);
- }
-
- public ChannelListener getChannelListener() {
-
- return channelListener;
- }
-
- /**
- * has members
- */
- public boolean hasMembers() {
- return coordinator.getMembershipService().hasMembers();
+ public void removeChannelListener(ChannelListener channelListener) {
+ channelListeners.remove(channelListener);
}
- /**
- * Get all current cluster members
- * @return all members or empty array
- */
- public Member[] getMembers() {
- return coordinator.getMembershipService().getMembers();
- }
-
- /**
- *
- * @param mbr Member
- * @return Member
- */
- public Member getMember(Member mbr){
- return coordinator.getMembershipService().getMember(mbr);
+ public Iterator getInterceptors() {
+ return new InterceptorIterator(this.getNext(),this.coordinator);
}
- /**
- * Return the member that represents this node.
- *
- * @return Member
- */
- public Member getLocalMember() {
- return coordinator.getMembershipService().getLocalMember();
- }
-
public static class InterceptorIterator implements Iterator {
private ChannelInterceptor end;
private ChannelInterceptor start;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
Tue Mar 7 07:40:30 2006
@@ -212,7 +212,9 @@
socketChannel = null;
}
} catch ( Exception x ) {
- log.error("Unable to disconnect.",x);
+ log.error("Unable to disconnect. msg="+x.getMessage());
+ if ( log.isDebugEnabled() )
+ log.debug("Unable to disconnect. msg="+x.getMessage(),x);
} finally {
reset();
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
Tue Mar 7 07:40:30 2006
@@ -360,8 +360,8 @@
LoadMessage msg = new LoadMessage();
messageSize = LoadMessage.getMessageSize(msg);
- channel.setChannelListener(test);
- channel.setMembershipListener(test);
+ channel.addChannelListener(test);
+ channel.addMembershipListener(test);
channel.start(channel.DEFAULT);
Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
while ( threads > 1 ) {
Modified:
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=383911&r1=383910&r2=383911&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
(original)
+++
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
Tue Mar 7 07:40:30 2006
@@ -652,8 +652,8 @@
try {
if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
this.registerClusterValve();
- channel.setMembershipListener(this);
- channel.setChannelListener(this);
+ channel.addMembershipListener(this);
+ channel.addChannelListener(this);
channel.start(channel.DEFAULT);
if (clusterDeployer != null) clusterDeployer.start();
this.started = true;
@@ -732,6 +732,8 @@
try {
if ( clusterDeployer != null ) clusterDeployer.setCluster(null);
channel.stop(Channel.DEFAULT);
+ channel.removeChannelListener(this);
+ channel.removeMembershipListener(this);
this.unregisterClusterValve();
} catch (Exception x) {
log.error("Unable to stop cluster valve.", x);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]