Author: fhanik Date: Tue May 16 10:01:22 2006 New Revision: 406989 URL: http://svn.apache.org/viewcvs?rev=406989&view=rev Log: Documented the group channel
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=406989&r1=406988&r2=406989&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Tue May 16 10:01:22 2006 @@ -39,33 +39,81 @@ import org.apache.catalina.tribes.UniqueId; /** + * The default implementation of a Channel.<br> * The GroupChannel manages the replication channel. It coordinates * message being sent and received with membership announcements. - * The channel has an chain of interceptors that can modify the message or perform other logic. - * It manages a complete cluster group, both membership and replication. + * The channel has an chain of interceptors that can modify the message or perform other logic.<br> + * It manages a complete group, both membership and replication. * @author Filip Hanik * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel { - private boolean heartbeat = true; + /** + * Flag to determine if the channel manages its own heartbeat + * If set to true, the channel will start a local thread for the heart beat. + */ + protected boolean heartbeat = true; + /** + * If <code>heartbeat == true</code> then how often do we want this + * heartbeat to run. default is one minute + */ protected long heartbeatSleeptime = 60*1000;//only run once a minute + + /** + * Internal heartbeat thread + */ protected HeartbeatThread hbthread = null; - private ChannelCoordinator coordinator = new ChannelCoordinator(); - private ChannelInterceptor interceptors = null; + /** + * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> + * - MembershipService<br> + * - ChannelSender <br> + * - ChannelReceiver<br> + */ + protected ChannelCoordinator coordinator = new ChannelCoordinator(); - private ArrayList membershipListeners = new ArrayList(); - private ArrayList channelListeners = new ArrayList(); - private boolean optionCheck = false; - + /** + * The first interceptor in the inteceptor stack. + * The interceptors are chained in a linked list, so we only need a reference to the + * first one + */ + protected ChannelInterceptor interceptors = null; + + /** + * A list of membership listeners that subscribe to membership announcements + */ + protected ArrayList membershipListeners = new ArrayList(); + + /** + * A list of channel listeners that subscribe to incoming messages + */ + protected ArrayList channelListeners = new ArrayList(); + + /** + * If set to true, the GroupChannel will check to make sure that + */ + protected boolean optionCheck = false; + /** + * Creates a GroupChannel. This constructor will also + * add the first interceptor in the GroupChannel.<br> + * The first interceptor is always the channel itself. + */ public GroupChannel() { addInterceptor(this); } /** - * Adds an interceptor to the stack for message processing + * Adds an interceptor to the stack for message processing<br> + * Interceptors are ordered in the way they are added.<br> + * <code>channel.addInterceptor(A);</code><br> + * <code>channel.addInterceptor(C);</code><br> + * <code>channel.addInterceptor(B);</code><br> + * Will result in a interceptor stack like this:<br> + * <code>A -> C -> B</code><br> + * The complete stack will look like this:<br> + * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br> * @param interceptor ChannelInterceptorBase */ public void addInterceptor(ChannelInterceptor interceptor) { @@ -86,21 +134,42 @@ } } + /** + * Sends a heartbeat through the interceptor stack.<br> + * Invoke this method from the application on a periodic basis if + * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code> + */ public void heartbeat() { super.heartbeat(); } /** - * Send a message to one or more members in the cluster - * @param destination Member[] - the destinations, null or zero length means all - * @param msg ClusterMessage - the message to send - * @param options int - sender options, see class documentation - * @return ClusterMessage[] - the replies from the members, if any. + * Send a message to the destinations specified + * @param destination Member[] - destination.length > 1 + * @param msg Serializable - the message to send + * @param options int - sender options, options can trigger guarantee levels and different interceptors to + * react to the message see class documentation for the <code>Channel</code> object.<br> + * @return UniqueId - the unique Id that was assigned to this message + * @throws ChannelException - if an error occurs processing the message + * @see org.apache.catalina.tribes.Channel */ public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException { return send(destination,msg,options,null); } + + /** + * + * @param destination Member[] - destination.length > 1 + * @param msg Serializable - the message to send + * @param options int - sender options, options can trigger guarantee levels and different interceptors to + * react to the message see class documentation for the <code>Channel</code> object.<br> + * @param handler - callback object for error handling and completion notification, used when a message is + * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled. + * @return UniqueId - the unique Id that was assigned to this message + * @throws ChannelException - if an error occurs processing the message + * @see org.apache.catalina.tribes.Channel + */ public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException { if ( msg == null ) throw new ChannelException("Cant send a NULL message"); try { @@ -114,6 +183,7 @@ options = options | SEND_OPTIONS_BYTE_MESSAGE; } else { b = XByteBuffer.serialize(msg); + options = options & (~SEND_OPTIONS_BYTE_MESSAGE); } data.setOptions(options); XByteBuffer buffer = new XByteBuffer(b.length+128,false); @@ -133,7 +203,14 @@ } - + /** + * Callback from the interceptor stack. <br> + * When a message is received from a remote node, this method will be invoked by + * the previous interceptor.<br> + * This method can also be used to send a message to other components within the same application, + * but its an extreme case, and you're probably better off doing that logic between the applications itself. + * @param msg ChannelMessage + */ public void messageReceived(ChannelMessage msg) { if ( msg == null ) return; try { @@ -155,6 +232,8 @@ } }//for if ((!rx) && (fwd instanceof RpcMessage)) { + //if we have a message that requires a response, + //but none was given, send back an immediate one sendNoRpcChannelReply((RpcMessage)fwd,source); } } catch ( Exception x ) { @@ -162,17 +241,29 @@ } } - protected void sendNoRpcChannelReply(RpcMessage msg, Member source) { + /** + * Sends a <code>NoRpcChannelReply</code> message to a member<br> + * This method gets invoked by the channel if a RPC message comes in + * and no channel listener accepts the message. This avoids timeout + * @param msg RpcMessage + * @param destination Member - the destination for the reply + */ + protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) { try { //avoid circular loop if ( msg instanceof RpcMessage.NoRpcChannelReply) return; RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid); - send(new Member[]{source},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS); + send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS); } catch ( Exception x ) { log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x); } } + /** + * memberAdded gets invoked by the interceptor below the channel + * and the channel will broadcast it to the membership listeners + * @param member Member - the new member + */ public void memberAdded(Member member) { //notify upwards for (int i=0; i<membershipListeners.size(); i++ ) { @@ -181,6 +272,11 @@ } } + /** + * memberDisappeared gets invoked by the interceptor below the channel + * and the channel will broadcast it to the membership listeners + * @param member Member - the member that left or crashed + */ public void memberDisappeared(Member member) { //notify upwards for (int i=0; i<membershipListeners.size(); i++ ) { @@ -189,6 +285,11 @@ } } + /** + * Sets up the default implementation interceptor stack + * if no interceptors have been added + * @throws ChannelException + */ protected synchronized void setupDefaultStack() throws ChannelException { if ( getFirstInterceptor() != null && @@ -210,6 +311,11 @@ } } + /** + * Validates the option flags that each interceptor is using and reports + * an error if two interceptor share the same flag. + * @throws ChannelException + */ protected void checkOptionFlags() throws ChannelException { StringBuffer conflicts = new StringBuffer(); ChannelInterceptor first = interceptors; @@ -239,6 +345,12 @@ } + /** + * Starts the channel + * @param svc int - what service to start + * @throws ChannelException + * @see org.apache.catalina.tribes.Channel#start(int) + */ public synchronized void start(int svc) throws ChannelException { setupDefaultStack(); if (optionCheck) checkOptionFlags(); @@ -249,6 +361,12 @@ } } + /** + * Stops the channel + * @param svc int + * @throws ChannelException + * @see org.apache.catalina.tribes.Channel#stop(int) + */ public synchronized void stop(int svc) throws ChannelException { if (hbthread != null) { hbthread.stopHeartbeat(); @@ -257,81 +375,174 @@ super.stop(svc); } + /** + * Returns the first interceptor of the stack. Useful for traversal. + * @return ChannelInterceptor + */ public ChannelInterceptor getFirstInterceptor() { if (interceptors != null) return interceptors; else return coordinator; } + /** + * Returns the channel receiver component + * @return ChannelReceiver + */ public ChannelReceiver getChannelReceiver() { return coordinator.getClusterReceiver(); } - + + /** + * Returns the channel sender component + * @return ChannelSender + */ public ChannelSender getChannelSender() { return coordinator.getClusterSender(); } + /** + * Returns the membership service component + * @return MembershipService + */ public MembershipService getMembershipService() { return coordinator.getMembershipService(); } + /** + * Sets the channel receiver component + * @param clusterReceiver ChannelReceiver + */ public void setChannelReceiver(ChannelReceiver clusterReceiver) { coordinator.setClusterReceiver(clusterReceiver); } + /** + * Sets the channel sender component + * @param clusterSender ChannelSender + */ public void setChannelSender(ChannelSender clusterSender) { coordinator.setClusterSender(clusterSender); } - + + /** + * Sets the membership component + * @param membershipService MembershipService + */ public void setMembershipService(MembershipService membershipService) { coordinator.setMembershipService(membershipService); } - + + /** + * Adds a membership listener to the channel.<br> + * Membership listeners are uniquely identified using the equals(Object) method + * @param membershipListener MembershipListener + */ public void addMembershipListener(MembershipListener membershipListener) { if (!this.membershipListeners.contains(membershipListener) ) this.membershipListeners.add(membershipListener); } + /** + * Removes a membership listener from the channel.<br> + * Membership listeners are uniquely identified using the equals(Object) method + * @param membershipListener MembershipListener + */ + public void removeMembershipListener(MembershipListener membershipListener) { membershipListeners.remove(membershipListener); } + /** + * Adds a channel listener to the channel.<br> + * Channel listeners are uniquely identified using the equals(Object) method + * @param channelListener ChannelListener + */ public void addChannelListener(ChannelListener channelListener) { if (!this.channelListeners.contains(channelListener) ) this.channelListeners.add(channelListener); } + /** + * + * Removes a channel listener from the channel.<br> + * Channel listeners are uniquely identified using the equals(Object) method + * @param channelListener ChannelListener + */ public void removeChannelListener(ChannelListener channelListener) { channelListeners.remove(channelListener); } - + + /** + * Returns an iterator of all the interceptors in this stack + * @return Iterator + */ public Iterator getInterceptors() { return new InterceptorIterator(this.getNext(),this.coordinator); } + /** + * Enables/disables the option check<br> + * Setting this to true, will make the GroupChannel perform a conflict check + * on the interceptors. If two interceptors are using the same option flag + * and throw an error upon start. + * @param optionCheck boolean + */ public void setOptionCheck(boolean optionCheck) { this.optionCheck = optionCheck; } + /** + * Configure local heartbeat sleep time<br> + * Only used when <code>getHeartbeat()==true</code> + * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats + */ public void setHeartbeatSleeptime(long heartbeatSleeptime) { this.heartbeatSleeptime = heartbeatSleeptime; } + /** + * Enables or disables local heartbeat. + * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal + * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds + * @param heartbeat boolean + */ public void setHeartbeat(boolean heartbeat) { this.heartbeat = heartbeat; } + /** + * @see #setOptionCheck(boolean) + * @return boolean + */ public boolean getOptionCheck() { return optionCheck; } + /** + * @see #setHeartbeat(boolean) + * @return boolean + */ public boolean getHeartbeat() { return heartbeat; } - + + /** + * Returns the sleep time in milliseconds that the internal heartbeat will + * sleep in between invokations of <code>Channel.heartbeat()</code> + * @return long + */ public long getHeartbeatSleeptime() { return heartbeatSleeptime; } + /** + * + * <p>Title: Interceptor Iterator</p> + * + * <p>Description: An iterator to loop through the interceptors in a channel</p> + * + * @version 1.0 + */ public static class InterceptorIterator implements Iterator { private ChannelInterceptor end; private ChannelInterceptor start; @@ -358,6 +569,15 @@ } } + /** + * + * <p>Title: Internal heartbeat thread</p> + * + * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class + * is created</p> + * + * @version 1.0 + */ public static class HeartbeatThread extends Thread { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(HeartbeatThread.class); protected static int counter = 1; @@ -389,7 +609,7 @@ } catch ( InterruptedException x ) { interrupted(); } catch ( Exception x ) { - log.error("Unable to send heartbeat through Tribes interceptor stack.",x); + log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x); }//catch }//while }//run --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]