Author: fhanik
Date: Tue May 16 10:01:22 2006
New Revision: 406989
URL: http://svn.apache.org/viewcvs?rev=406989&view=rev
Log:
Documented the group channel
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=406989&r1=406988&r2=406989&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Tue May 16 10:01:22 2006
@@ -39,33 +39,81 @@
import org.apache.catalina.tribes.UniqueId;
/**
+ * The default implementation of a Channel.<br>
* The GroupChannel manages the replication channel. It coordinates
* message being sent and received with membership announcements.
- * The channel has an chain of interceptors that can modify the message or
perform other logic.
- * It manages a complete cluster group, both membership and replication.
+ * The channel has an chain of interceptors that can modify the message or
perform other logic.<br>
+ * It manages a complete group, both membership and replication.
* @author Filip Hanik
* @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
*/
public class GroupChannel extends ChannelInterceptorBase implements
ManagedChannel {
- private boolean heartbeat = true;
+ /**
+ * Flag to determine if the channel manages its own heartbeat
+ * If set to true, the channel will start a local thread for the heart
beat.
+ */
+ protected boolean heartbeat = true;
+ /**
+ * If <code>heartbeat == true</code> then how often do we want this
+ * heartbeat to run. default is one minute
+ */
protected long heartbeatSleeptime = 60*1000;//only run once a minute
+
+ /**
+ * Internal heartbeat thread
+ */
protected HeartbeatThread hbthread = null;
- private ChannelCoordinator coordinator = new ChannelCoordinator();
- private ChannelInterceptor interceptors = null;
+ /**
+ * The <code>ChannelCoordinator</code> coordinates the bottom layer
components:<br>
+ * - MembershipService<br>
+ * - ChannelSender <br>
+ * - ChannelReceiver<br>
+ */
+ protected ChannelCoordinator coordinator = new ChannelCoordinator();
- private ArrayList membershipListeners = new ArrayList();
- private ArrayList channelListeners = new ArrayList();
- private boolean optionCheck = false;
-
+ /**
+ * The first interceptor in the inteceptor stack.
+ * The interceptors are chained in a linked list, so we only need a
reference to the
+ * first one
+ */
+ protected ChannelInterceptor interceptors = null;
+
+ /**
+ * A list of membership listeners that subscribe to membership
announcements
+ */
+ protected ArrayList membershipListeners = new ArrayList();
+
+ /**
+ * A list of channel listeners that subscribe to incoming messages
+ */
+ protected ArrayList channelListeners = new ArrayList();
+
+ /**
+ * If set to true, the GroupChannel will check to make sure that
+ */
+ protected boolean optionCheck = false;
+ /**
+ * Creates a GroupChannel. This constructor will also
+ * add the first interceptor in the GroupChannel.<br>
+ * The first interceptor is always the channel itself.
+ */
public GroupChannel() {
addInterceptor(this);
}
/**
- * Adds an interceptor to the stack for message processing
+ * Adds an interceptor to the stack for message processing<br>
+ * Interceptors are ordered in the way they are added.<br>
+ * <code>channel.addInterceptor(A);</code><br>
+ * <code>channel.addInterceptor(C);</code><br>
+ * <code>channel.addInterceptor(B);</code><br>
+ * Will result in a interceptor stack like this:<br>
+ * <code>A -> C -> B</code><br>
+ * The complete stack will look like this:<br>
+ * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
* @param interceptor ChannelInterceptorBase
*/
public void addInterceptor(ChannelInterceptor interceptor) {
@@ -86,21 +134,42 @@
}
}
+ /**
+ * Sends a heartbeat through the interceptor stack.<br>
+ * Invoke this method from the application on a periodic basis if
+ * you have turned off internal heartbeats
<code>channel.setHeartbeat(false)</code>
+ */
public void heartbeat() {
super.heartbeat();
}
/**
- * Send a message to one or more members in the cluster
- * @param destination Member[] - the destinations, null or zero length
means all
- * @param msg ClusterMessage - the message to send
- * @param options int - sender options, see class documentation
- * @return ClusterMessage[] - the replies from the members, if any.
+ * Send a message to the destinations specified
+ * @param destination Member[] - destination.length > 1
+ * @param msg Serializable - the message to send
+ * @param options int - sender options, options can trigger guarantee
levels and different interceptors to
+ * react to the message see class documentation for the
<code>Channel</code> object.<br>
+ * @return UniqueId - the unique Id that was assigned to this message
+ * @throws ChannelException - if an error occurs processing the message
+ * @see org.apache.catalina.tribes.Channel
*/
public UniqueId send(Member[] destination, Serializable msg, int options)
throws ChannelException {
return send(destination,msg,options,null);
}
+
+ /**
+ *
+ * @param destination Member[] - destination.length > 1
+ * @param msg Serializable - the message to send
+ * @param options int - sender options, options can trigger guarantee
levels and different interceptors to
+ * react to the message see class documentation for the
<code>Channel</code> object.<br>
+ * @param handler - callback object for error handling and completion
notification, used when a message is
+ * sent asynchronously using the
<code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
+ * @return UniqueId - the unique Id that was assigned to this message
+ * @throws ChannelException - if an error occurs processing the message
+ * @see org.apache.catalina.tribes.Channel
+ */
public UniqueId send(Member[] destination, Serializable msg, int options,
ErrorHandler handler) throws ChannelException {
if ( msg == null ) throw new ChannelException("Cant send a NULL
message");
try {
@@ -114,6 +183,7 @@
options = options | SEND_OPTIONS_BYTE_MESSAGE;
} else {
b = XByteBuffer.serialize(msg);
+ options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
}
data.setOptions(options);
XByteBuffer buffer = new XByteBuffer(b.length+128,false);
@@ -133,7 +203,14 @@
}
-
+ /**
+ * Callback from the interceptor stack. <br>
+ * When a message is received from a remote node, this method will be
invoked by
+ * the previous interceptor.<br>
+ * This method can also be used to send a message to other components
within the same application,
+ * but its an extreme case, and you're probably better off doing that
logic between the applications itself.
+ * @param msg ChannelMessage
+ */
public void messageReceived(ChannelMessage msg) {
if ( msg == null ) return;
try {
@@ -155,6 +232,8 @@
}
}//for
if ((!rx) && (fwd instanceof RpcMessage)) {
+ //if we have a message that requires a response,
+ //but none was given, send back an immediate one
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
} catch ( Exception x ) {
@@ -162,17 +241,29 @@
}
}
- protected void sendNoRpcChannelReply(RpcMessage msg, Member source) {
+ /**
+ * Sends a <code>NoRpcChannelReply</code> message to a member<br>
+ * This method gets invoked by the channel if a RPC message comes in
+ * and no channel listener accepts the message. This avoids timeout
+ * @param msg RpcMessage
+ * @param destination Member - the destination for the reply
+ */
+ protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
try {
//avoid circular loop
if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
RpcMessage.NoRpcChannelReply reply = new
RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
- send(new Member[]{source},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ send(new
Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
} catch ( Exception x ) {
log.error("Unable to find rpc channel, failed to send
NoRpcChannelReply.",x);
}
}
+ /**
+ * memberAdded gets invoked by the interceptor below the channel
+ * and the channel will broadcast it to the membership listeners
+ * @param member Member - the new member
+ */
public void memberAdded(Member member) {
//notify upwards
for (int i=0; i<membershipListeners.size(); i++ ) {
@@ -181,6 +272,11 @@
}
}
+ /**
+ * memberDisappeared gets invoked by the interceptor below the channel
+ * and the channel will broadcast it to the membership listeners
+ * @param member Member - the member that left or crashed
+ */
public void memberDisappeared(Member member) {
//notify upwards
for (int i=0; i<membershipListeners.size(); i++ ) {
@@ -189,6 +285,11 @@
}
}
+ /**
+ * Sets up the default implementation interceptor stack
+ * if no interceptors have been added
+ * @throws ChannelException
+ */
protected synchronized void setupDefaultStack() throws ChannelException {
if ( getFirstInterceptor() != null &&
@@ -210,6 +311,11 @@
}
}
+ /**
+ * Validates the option flags that each interceptor is using and reports
+ * an error if two interceptor share the same flag.
+ * @throws ChannelException
+ */
protected void checkOptionFlags() throws ChannelException {
StringBuffer conflicts = new StringBuffer();
ChannelInterceptor first = interceptors;
@@ -239,6 +345,12 @@
}
+ /**
+ * Starts the channel
+ * @param svc int - what service to start
+ * @throws ChannelException
+ * @see org.apache.catalina.tribes.Channel#start(int)
+ */
public synchronized void start(int svc) throws ChannelException {
setupDefaultStack();
if (optionCheck) checkOptionFlags();
@@ -249,6 +361,12 @@
}
}
+ /**
+ * Stops the channel
+ * @param svc int
+ * @throws ChannelException
+ * @see org.apache.catalina.tribes.Channel#stop(int)
+ */
public synchronized void stop(int svc) throws ChannelException {
if (hbthread != null) {
hbthread.stopHeartbeat();
@@ -257,81 +375,174 @@
super.stop(svc);
}
+ /**
+ * Returns the first interceptor of the stack. Useful for traversal.
+ * @return ChannelInterceptor
+ */
public ChannelInterceptor getFirstInterceptor() {
if (interceptors != null) return interceptors;
else return coordinator;
}
+ /**
+ * Returns the channel receiver component
+ * @return ChannelReceiver
+ */
public ChannelReceiver getChannelReceiver() {
return coordinator.getClusterReceiver();
}
-
+
+ /**
+ * Returns the channel sender component
+ * @return ChannelSender
+ */
public ChannelSender getChannelSender() {
return coordinator.getClusterSender();
}
+ /**
+ * Returns the membership service component
+ * @return MembershipService
+ */
public MembershipService getMembershipService() {
return coordinator.getMembershipService();
}
+ /**
+ * Sets the channel receiver component
+ * @param clusterReceiver ChannelReceiver
+ */
public void setChannelReceiver(ChannelReceiver clusterReceiver) {
coordinator.setClusterReceiver(clusterReceiver);
}
+ /**
+ * Sets the channel sender component
+ * @param clusterSender ChannelSender
+ */
public void setChannelSender(ChannelSender clusterSender) {
coordinator.setClusterSender(clusterSender);
}
-
+
+ /**
+ * Sets the membership component
+ * @param membershipService MembershipService
+ */
public void setMembershipService(MembershipService membershipService) {
coordinator.setMembershipService(membershipService);
}
-
+
+ /**
+ * Adds a membership listener to the channel.<br>
+ * Membership listeners are uniquely identified using the equals(Object)
method
+ * @param membershipListener MembershipListener
+ */
public void addMembershipListener(MembershipListener membershipListener) {
if (!this.membershipListeners.contains(membershipListener) )
this.membershipListeners.add(membershipListener);
}
+ /**
+ * Removes a membership listener from the channel.<br>
+ * Membership listeners are uniquely identified using the equals(Object)
method
+ * @param membershipListener MembershipListener
+ */
+
public void removeMembershipListener(MembershipListener
membershipListener) {
membershipListeners.remove(membershipListener);
}
+ /**
+ * Adds a channel listener to the channel.<br>
+ * Channel listeners are uniquely identified using the equals(Object)
method
+ * @param channelListener ChannelListener
+ */
public void addChannelListener(ChannelListener channelListener) {
if (!this.channelListeners.contains(channelListener) )
this.channelListeners.add(channelListener);
}
+ /**
+ *
+ * Removes a channel listener from the channel.<br>
+ * Channel listeners are uniquely identified using the equals(Object)
method
+ * @param channelListener ChannelListener
+ */
public void removeChannelListener(ChannelListener channelListener) {
channelListeners.remove(channelListener);
}
-
+
+ /**
+ * Returns an iterator of all the interceptors in this stack
+ * @return Iterator
+ */
public Iterator getInterceptors() {
return new InterceptorIterator(this.getNext(),this.coordinator);
}
+ /**
+ * Enables/disables the option check<br>
+ * Setting this to true, will make the GroupChannel perform a conflict
check
+ * on the interceptors. If two interceptors are using the same option flag
+ * and throw an error upon start.
+ * @param optionCheck boolean
+ */
public void setOptionCheck(boolean optionCheck) {
this.optionCheck = optionCheck;
}
+ /**
+ * Configure local heartbeat sleep time<br>
+ * Only used when <code>getHeartbeat()==true</code>
+ * @param heartbeatSleeptime long - time in milliseconds to sleep between
heartbeats
+ */
public void setHeartbeatSleeptime(long heartbeatSleeptime) {
this.heartbeatSleeptime = heartbeatSleeptime;
}
+ /**
+ * Enables or disables local heartbeat.
+ * if <code>setHeartbeat(true)</code> is invoked then the channel will
start an internal
+ * thread to invoke <code>Channel.heartbeat()</code> every
<code>getHeartbeatSleeptime</code> milliseconds
+ * @param heartbeat boolean
+ */
public void setHeartbeat(boolean heartbeat) {
this.heartbeat = heartbeat;
}
+ /**
+ * @see #setOptionCheck(boolean)
+ * @return boolean
+ */
public boolean getOptionCheck() {
return optionCheck;
}
+ /**
+ * @see #setHeartbeat(boolean)
+ * @return boolean
+ */
public boolean getHeartbeat() {
return heartbeat;
}
-
+
+ /**
+ * Returns the sleep time in milliseconds that the internal heartbeat will
+ * sleep in between invokations of <code>Channel.heartbeat()</code>
+ * @return long
+ */
public long getHeartbeatSleeptime() {
return heartbeatSleeptime;
}
+ /**
+ *
+ * <p>Title: Interceptor Iterator</p>
+ *
+ * <p>Description: An iterator to loop through the interceptors in a
channel</p>
+ *
+ * @version 1.0
+ */
public static class InterceptorIterator implements Iterator {
private ChannelInterceptor end;
private ChannelInterceptor start;
@@ -358,6 +569,15 @@
}
}
+ /**
+ *
+ * <p>Title: Internal heartbeat thread</p>
+ *
+ * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a
thread of this class
+ * is created</p>
+ *
+ * @version 1.0
+ */
public static class HeartbeatThread extends Thread {
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(HeartbeatThread.class);
protected static int counter = 1;
@@ -389,7 +609,7 @@
} catch ( InterruptedException x ) {
interrupted();
} catch ( Exception x ) {
- log.error("Unable to send heartbeat through Tribes
interceptor stack.",x);
+ log.error("Unable to send heartbeat through Tribes
interceptor stack. Will try to sleep again.",x);
}//catch
}//while
}//run
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]