Author: fhanik
Date: Tue May 16 10:01:22 2006
New Revision: 406989

URL: http://svn.apache.org/viewcvs?rev=406989&view=rev
Log:
Documented the group channel

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=406989&r1=406988&r2=406989&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Tue May 16 10:01:22 2006
@@ -39,33 +39,81 @@
 import org.apache.catalina.tribes.UniqueId;
 
 /**
+ * The default implementation of a Channel.<br>
  * The GroupChannel manages the replication channel. It coordinates
  * message being sent and received with membership announcements.
- * The channel has an chain of interceptors that can modify the message or 
perform other logic.
- * It manages a complete cluster group, both membership and replication.
+ * The channel has an chain of interceptors that can modify the message or 
perform other logic.<br>
+ * It manages a complete group, both membership and replication.
  * @author Filip Hanik
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
  */
 public class GroupChannel extends ChannelInterceptorBase implements 
ManagedChannel {
-    private boolean heartbeat = true;
+    /**
+     * Flag to determine if the channel manages its own heartbeat
+     * If set to true, the channel will start a local thread for the heart 
beat.
+     */
+    protected boolean heartbeat = true;
+    /**
+     * If <code>heartbeat == true</code> then how often do we want this 
+     * heartbeat to run. default is one minute
+     */
     protected long heartbeatSleeptime = 60*1000;//only run once a minute
+    
+    /**
+     * Internal heartbeat thread
+     */
     protected HeartbeatThread hbthread = null;
     
-    private ChannelCoordinator coordinator = new ChannelCoordinator();
-    private ChannelInterceptor interceptors = null;
+    /**
+     * The  <code>ChannelCoordinator</code> coordinates the bottom layer 
components:<br>
+     * - MembershipService<br>
+     * - ChannelSender <br>
+     * - ChannelReceiver<br>
+     */
+    protected ChannelCoordinator coordinator = new ChannelCoordinator();
     
-    private ArrayList membershipListeners = new ArrayList();
-    private ArrayList channelListeners = new ArrayList();
-    private boolean optionCheck = false;
-
+    /**
+     * The first interceptor in the inteceptor stack.
+     * The interceptors are chained in a linked list, so we only need a 
reference to the 
+     * first one
+     */
+    protected ChannelInterceptor interceptors = null;
+    
+    /**
+     * A list of membership listeners that subscribe to membership 
announcements
+     */
+    protected ArrayList membershipListeners = new ArrayList();
+    
+    /**
+     * A list of channel listeners that subscribe to incoming messages
+     */
+    protected ArrayList channelListeners = new ArrayList();
+    
+    /**
+     * If set to true, the GroupChannel will check to make sure that 
+     */
+    protected boolean optionCheck = false;
 
+    /**
+     * Creates a GroupChannel. This constructor will also 
+     * add the first interceptor in the GroupChannel.<br>
+     * The first interceptor is always the channel itself.
+     */
     public GroupChannel() {
         addInterceptor(this);
     }
     
     
     /**
-     * Adds an interceptor to the stack for message processing
+     * Adds an interceptor to the stack for message processing<br>
+     * Interceptors are ordered in the way they are added.<br>
+     * <code>channel.addInterceptor(A);</code><br>
+     * <code>channel.addInterceptor(C);</code><br>
+     * <code>channel.addInterceptor(B);</code><br>
+     * Will result in a interceptor stack like this:<br>
+     * <code>A -> C -> B</code><br>
+     * The complete stack will look like this:<br>
+     * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
      * @param interceptor ChannelInterceptorBase
      */
     public void addInterceptor(ChannelInterceptor interceptor) { 
@@ -86,21 +134,42 @@
         }
     }
     
+    /**
+     * Sends a heartbeat through the interceptor stack.<br>
+     * Invoke this method from the application on a periodic basis if
+     * you have turned off internal heartbeats 
<code>channel.setHeartbeat(false)</code>
+     */
     public void heartbeat() {
         super.heartbeat();
     }
     
     
     /**
-     * Send a message to one or more members in the cluster
-     * @param destination Member[] - the destinations, null or zero length 
means all
-     * @param msg ClusterMessage - the message to send
-     * @param options int - sender options, see class documentation
-     * @return ClusterMessage[] - the replies from the members, if any.
+     * Send a message to the destinations specified
+     * @param destination Member[] - destination.length > 1
+     * @param msg Serializable - the message to send
+     * @param options int - sender options, options can trigger guarantee 
levels and different interceptors to
+     * react to the message see class documentation for the 
<code>Channel</code> object.<br>
+     * @return UniqueId - the unique Id that was assigned to this message
+     * @throws ChannelException - if an error occurs processing the message
+     * @see org.apache.catalina.tribes.Channel
      */
     public UniqueId send(Member[] destination, Serializable msg, int options) 
throws ChannelException {
         return send(destination,msg,options,null);
     }
+    
+    /**
+     * 
+     * @param destination Member[] - destination.length > 1
+     * @param msg Serializable - the message to send
+     * @param options int - sender options, options can trigger guarantee 
levels and different interceptors to
+     * react to the message see class documentation for the 
<code>Channel</code> object.<br>
+     * @param handler - callback object for error handling and completion 
notification, used when a message is 
+     * sent asynchronously using the 
<code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
+     * @return UniqueId - the unique Id that was assigned to this message
+     * @throws ChannelException - if an error occurs processing the message
+     * @see org.apache.catalina.tribes.Channel
+     */
     public UniqueId send(Member[] destination, Serializable msg, int options, 
ErrorHandler handler) throws ChannelException {
         if ( msg == null ) throw new ChannelException("Cant send a NULL 
message");
         try {
@@ -114,6 +183,7 @@
                 options = options | SEND_OPTIONS_BYTE_MESSAGE;
             } else {
                 b = XByteBuffer.serialize(msg);
+                options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
             }
             data.setOptions(options);
             XByteBuffer buffer = new XByteBuffer(b.length+128,false);
@@ -133,7 +203,14 @@
     }
     
 
-    
+    /**
+     * Callback from the interceptor stack. <br>
+     * When a message is received from a remote node, this method will be 
invoked by
+     * the previous interceptor.<br>
+     * This method can also be used to send a message to other components 
within the same application,
+     * but its an extreme case, and you're probably better off doing that 
logic between the applications itself.
+     * @param msg ChannelMessage
+     */
     public void messageReceived(ChannelMessage msg) {
         if ( msg == null ) return;
         try {
@@ -155,6 +232,8 @@
                 }
             }//for
             if ((!rx) && (fwd instanceof RpcMessage)) {
+                //if we have a message that requires a response,
+                //but none was given, send back an immediate one
                 sendNoRpcChannelReply((RpcMessage)fwd,source);
             }
         } catch ( Exception x ) {
@@ -162,17 +241,29 @@
         }
     }
     
-    protected void sendNoRpcChannelReply(RpcMessage msg, Member source) {
+    /**
+     * Sends a <code>NoRpcChannelReply</code> message to a member<br>
+     * This method gets invoked by the channel if a RPC message comes in
+     * and no channel listener accepts the message. This avoids timeout
+     * @param msg RpcMessage
+     * @param destination Member - the destination for the reply
+     */
+    protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
         try {
             //avoid circular loop
             if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
             RpcMessage.NoRpcChannelReply reply = new 
RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
-            send(new Member[]{source},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
+            send(new 
Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
         } catch ( Exception x ) {
             log.error("Unable to find rpc channel, failed to send 
NoRpcChannelReply.",x);
         }
     }
     
+    /**
+     * memberAdded gets invoked by the interceptor below the channel
+     * and the channel will broadcast it to the membership listeners
+     * @param member Member - the new member
+     */
     public void memberAdded(Member member) {
         //notify upwards
         for (int i=0; i<membershipListeners.size(); i++ ) {
@@ -181,6 +272,11 @@
         }
     }
     
+    /**
+     * memberDisappeared gets invoked by the interceptor below the channel
+     * and the channel will broadcast it to the membership listeners
+     * @param member Member - the member that left or crashed
+     */
     public void memberDisappeared(Member member) {
         //notify upwards
         for (int i=0; i<membershipListeners.size(); i++ ) {
@@ -189,6 +285,11 @@
         }
     }
     
+    /**
+     * Sets up the default implementation interceptor stack
+     * if no interceptors have been added
+     * @throws ChannelException
+     */
     protected synchronized void setupDefaultStack() throws ChannelException {
         
         if ( getFirstInterceptor() != null && 
@@ -210,6 +311,11 @@
         }
     }
     
+    /**
+     * Validates the option flags that each interceptor is using and reports
+     * an error if two interceptor share the same flag.
+     * @throws ChannelException
+     */
     protected void checkOptionFlags() throws ChannelException {
         StringBuffer conflicts = new StringBuffer();
         ChannelInterceptor first = interceptors;
@@ -239,6 +345,12 @@
     
     }
     
+    /**
+     * Starts the channel
+     * @param svc int - what service to start
+     * @throws ChannelException
+     * @see org.apache.catalina.tribes.Channel#start(int)
+     */
     public synchronized void start(int svc) throws ChannelException {
         setupDefaultStack();
         if (optionCheck) checkOptionFlags();
@@ -249,6 +361,12 @@
         }
     }
     
+    /**
+     * Stops the channel
+     * @param svc int
+     * @throws ChannelException
+     * @see org.apache.catalina.tribes.Channel#stop(int)
+     */
     public synchronized void stop(int svc) throws ChannelException {
         if (hbthread != null) {
             hbthread.stopHeartbeat();
@@ -257,81 +375,174 @@
         super.stop(svc);
     }
     
+    /**
+     * Returns the first interceptor of the stack. Useful for traversal.
+     * @return ChannelInterceptor
+     */
     public ChannelInterceptor getFirstInterceptor() {
         if (interceptors != null) return interceptors;
         else return coordinator;
     }
     
+    /**
+     * Returns the channel receiver component
+     * @return ChannelReceiver
+     */
     public ChannelReceiver getChannelReceiver() {
         return coordinator.getClusterReceiver();
     }
-
+    
+    /**
+     * Returns the channel sender component
+     * @return ChannelSender
+     */
     public ChannelSender getChannelSender() {
         return coordinator.getClusterSender();
     }
 
+    /**
+     * Returns the membership service component
+     * @return MembershipService
+     */
     public MembershipService getMembershipService() {
         return coordinator.getMembershipService();
     }
     
+    /**
+     * Sets the channel receiver component
+     * @param clusterReceiver ChannelReceiver
+     */
     public void setChannelReceiver(ChannelReceiver clusterReceiver) {
         coordinator.setClusterReceiver(clusterReceiver);
     }
 
+    /**
+     * Sets the channel sender component
+     * @param clusterSender ChannelSender
+     */
     public void setChannelSender(ChannelSender clusterSender) {
         coordinator.setClusterSender(clusterSender);
     }
-
+    
+    /**
+     * Sets the membership component
+     * @param membershipService MembershipService
+     */
     public void setMembershipService(MembershipService membershipService) {
         coordinator.setMembershipService(membershipService);
     }
-
+    
+    /**
+     * Adds a membership listener to the channel.<br>
+     * Membership listeners are uniquely identified using the equals(Object) 
method
+     * @param membershipListener MembershipListener
+     */
     public void addMembershipListener(MembershipListener membershipListener) {
         if (!this.membershipListeners.contains(membershipListener) )
             this.membershipListeners.add(membershipListener);
     }
 
+    /**
+     * Removes a membership listener from the channel.<br>
+     * Membership listeners are uniquely identified using the equals(Object) 
method
+     * @param membershipListener MembershipListener
+     */
+    
     public void removeMembershipListener(MembershipListener 
membershipListener) {
         membershipListeners.remove(membershipListener);
     }
 
+    /**
+     * Adds a channel listener to the channel.<br>
+     * Channel listeners are uniquely identified using the equals(Object) 
method
+     * @param channelListener ChannelListener
+     */
     public void addChannelListener(ChannelListener channelListener) {
         if (!this.channelListeners.contains(channelListener) )
             this.channelListeners.add(channelListener);
     }
     
+    /**
+     * 
+     * Removes a channel listener from the channel.<br>
+     * Channel listeners are uniquely identified using the equals(Object) 
method
+     * @param channelListener ChannelListener
+     */
     public void removeChannelListener(ChannelListener channelListener) {
         channelListeners.remove(channelListener);
     }
-
+    
+    /**
+     * Returns an iterator of all the interceptors in this stack
+     * @return Iterator
+     */
     public Iterator getInterceptors() { 
         return new InterceptorIterator(this.getNext(),this.coordinator);
     }
 
+    /**
+     * Enables/disables the option check<br>
+     * Setting this to true, will make the GroupChannel perform a conflict 
check
+     * on the interceptors. If two interceptors are using the same option flag
+     * and throw an error upon start.
+     * @param optionCheck boolean
+     */
     public void setOptionCheck(boolean optionCheck) {
         this.optionCheck = optionCheck;
     }
 
+    /**
+     * Configure local heartbeat sleep time<br>
+     * Only used when <code>getHeartbeat()==true</code>
+     * @param heartbeatSleeptime long - time in milliseconds to sleep between 
heartbeats
+     */
     public void setHeartbeatSleeptime(long heartbeatSleeptime) {
         this.heartbeatSleeptime = heartbeatSleeptime;
     }
 
+    /**
+     * Enables or disables local heartbeat.
+     * if <code>setHeartbeat(true)</code> is invoked then the channel will 
start an internal 
+     * thread to invoke <code>Channel.heartbeat()</code> every 
<code>getHeartbeatSleeptime</code> milliseconds
+     * @param heartbeat boolean
+     */
     public void setHeartbeat(boolean heartbeat) {
         this.heartbeat = heartbeat;
     }
 
+    /**
+     * @see #setOptionCheck(boolean)
+     * @return boolean
+     */
     public boolean getOptionCheck() {
         return optionCheck;
     }
 
+    /**
+     * @see #setHeartbeat(boolean)
+     * @return boolean
+     */
     public boolean getHeartbeat() {
         return heartbeat;
     }
-
+    
+    /**
+     * Returns the sleep time in milliseconds that the internal heartbeat will
+     * sleep in between invokations of <code>Channel.heartbeat()</code>
+     * @return long
+     */
     public long getHeartbeatSleeptime() {
         return heartbeatSleeptime;
     }
 
+    /**
+     * 
+     * <p>Title: Interceptor Iterator</p> 
+     * 
+     * <p>Description: An iterator to loop through the interceptors in a 
channel</p> 
+     * 
+     * @version 1.0
+     */
     public static class InterceptorIterator implements Iterator {
         private ChannelInterceptor end;
         private ChannelInterceptor start;
@@ -358,6 +569,15 @@
         }
     }
 
+    /**
+     * 
+     * <p>Title: Internal heartbeat thread</p> 
+     * 
+     * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a 
thread of this class
+     * is created</p> 
+     * 
+     * @version 1.0
+     */
     public static class HeartbeatThread extends Thread {
         protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(HeartbeatThread.class);
         protected static int counter = 1;
@@ -389,7 +609,7 @@
                 } catch ( InterruptedException x ) {
                     interrupted();
                 } catch ( Exception x ) {
-                    log.error("Unable to send heartbeat through Tribes 
interceptor stack.",x);
+                    log.error("Unable to send heartbeat through Tribes 
interceptor stack. Will try to sleep again.",x);
                 }//catch
             }//while
         }//run



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to