Adds a threadsafe latch to the DeltaManager which is used to block
processing of cluster messages until local applications have completed
initialization.

Includes changes to the DeltaManager to create the latch based on
configuration, changes to Catalina to automatically open the latch when
initialization is complete, a constant used to as the attribute key in the
ServletContext and an modification to the mbean-descriptor for the new
DeltaManager attribute.

Also includes a handful of style cleanups interspersed (spelling, removing
compiler warnings about type checking, removing spaces before semi colons
and blank lines before and after braces) throughout.

  - Jason

"That's the problem. He's a brilliant lunatic and you can't tell
which way he'll jump --
like his game he's impossible to analyse --
you can't dissect him, predict him --
which of course means he's not a lunatic at all."


Index: catalina/Globals.java
===================================================================
--- catalina/Globals.java    (revision 719433)
+++ catalina/Globals.java    (working copy)
@@ -36,6 +36,14 @@
         "org.apache.catalina.deploy.alt_dd";

     /**
+     * The servlet context attribute under which we store the concurrent
latch
+     * used to block processing cluster messages until after local
application
+     * initialization
+     */
+    public static final String CLUSTER_DELAY =
+        "org.apache.catalina.ha.delay";
+
+    /**
      * The request attribute under which we store the array of
X509Certificate
      * objects representing the certificate chain presented by our client,
      * if any.
Index: catalina/ha/session/DeltaManager.java
===================================================================
--- catalina/ha/session/DeltaManager.java    (revision 719433)
+++ catalina/ha/session/DeltaManager.java    (working copy)
@@ -26,11 +26,15 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;

 import org.apache.catalina.Cluster;
 import org.apache.catalina.Container;
 import org.apache.catalina.Context;
 import org.apache.catalina.Engine;
+import org.apache.catalina.Globals;
 import org.apache.catalina.Host;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.LifecycleListener;
@@ -38,13 +42,13 @@
 import org.apache.catalina.Valve;
 import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
 import org.apache.catalina.ha.ClusterMessage;
 import org.apache.catalina.ha.tcp.ReplicationValve;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.ReplicationStream;
 import org.apache.catalina.util.LifecycleSupport;
 import org.apache.catalina.util.StringManager;
-import org.apache.catalina.ha.ClusterManager;

 /**
  * The DeltaManager manages replicated sessions by only replicating the
deltas
@@ -62,10 +66,11 @@
  * @author Craig R. McClanahan
  * @author Jean-Francois Arcand
  * @author Peter Rossbach
+ * @author Jason Lunn
  * @version $Revision$ $Date$
  */

-public class DeltaManager extends ClusterManagerBase{
+public class DeltaManager extends ClusterManagerBase {

     // ---------------------------------------------------- Security
Classes
     public static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
@@ -106,6 +111,18 @@
     protected LifecycleSupport lifecycle = new LifecycleSupport(this);

     /**
+     * Flag indicating that messageReceived(ClusterMessage) should block
until
+     * all local applications have completed initialization
+     */
+    protected boolean delay = false;
+
+    /**
+     * Barrier used to receive notification from other threads that it is
okay
+     * to process incoming messages from the cluster
+     */
+    private CountDownLatch gate = null;
+
+    /**
      * The maximum number of active Sessions allowed, or -1 for no limit.
      */
     private int maxActiveSessions = -1;
@@ -121,8 +138,8 @@
     /**
      * wait time between send session block (default 2 sec)
      */
-    private int sendAllSessionsWaitTime = 2 * 1000 ;
-    private ArrayList receivedMessageQueue = new ArrayList() ;
+    private int sendAllSessionsWaitTime = 2 * 1000;
+    private ArrayList<SessionMessage> receivedMessageQueue = new
ArrayList<SessionMessage>() ;
     private boolean receiverQueue = false ;
     private boolean stateTimestampDrop = true ;
     private long stateTransferCreateSendTime;
@@ -175,6 +192,27 @@
     public String getName() {
         return name;
     }
+
+    /**
+     * Set the member that indicates processing messages should wait for
local
+     * initialization of applications to complete
+     *
+     * @param delayed If true, processing messageReceived(ClusterMessage)
will
+     * block until local web applications have completed initialization.
+     */
+    public void setDelay ( boolean delay ) {
+        this.delay = delay;
+    }
+
+    /**
+     * Gets the member the delayed flag
+     * @return delayed - boolean flag indicating that
+     * messageReceived(ClusterMessage) should block until local
initialization
+     * of applications has completed
+     */
+    public boolean getDelay () {
+        return delay;
+    }

     /**
      * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
@@ -781,6 +819,24 @@
         lifecycle.removeLifecycleListener(listener);
     }

+    @Override
+    /**
+     * If this.delay is true, create a gate that will be opened when
+     * local application initialization is complete
+     */
+    public void init () {
+        if (delay && container != null) {
+            if (container instanceof StandardContext) {
+                ServletContext servletContext = ((StandardContext)
container).getServletContext();
+                if (servletContext != null) {
+                    gate = new CountDownLatch(1);
+                    servletContext.setAttribute(Globals.CLUSTER_DELAY,
gate);
+                }
+            }
+        }
+        super.init();
+    }
+
     /**
      * Prepare for the beginning of active use of the public methods of
this
      * component. This method should be called after
<code>configure()</code>,
@@ -885,8 +941,8 @@
                 waitForSendAllSessions(beforeSendTime);
             } finally {
                 synchronized(receivedMessageQueue) {
-                    for (Iterator iter = receivedMessageQueue.iterator();
iter.hasNext();) {
-                        SessionMessage smsg = (SessionMessage) iter.next();
+                    for (Iterator<SessionMessage> iter =
receivedMessageQueue.iterator(); iter.hasNext();) {
+                        SessionMessage smsg = iter.next();
                         if (!stateTimestampDrop) {
                             messageReceived(smsg, smsg.getAddress() != null
? (Member) smsg.getAddress() : null);
                         } else {
@@ -1068,6 +1124,18 @@
      *            the message received.
      */
     public void messageDataReceived(ClusterMessage cmsg) {
+        // Block processing until local application initialization has
+        // completed, if a gate has been erected
+        if(gate != null) {
+            try {
+                gate.await();
+                gate = null;
+            }
+            catch(InterruptedException e) {
+                log.error(e, e);
+            }
+        }
+
         if (cmsg != null && cmsg instanceof SessionMessage) {
             SessionMessage msg = (SessionMessage) cmsg;
             switch (msg.getEventType()) {
@@ -1535,7 +1603,8 @@
         result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
         result.receiverQueue = receiverQueue ;
         result.stateTimestampDrop = stateTimestampDrop ;
-        result.stateTransferCreateSendTime = stateTransferCreateSendTime;
+        result.stateTransferCreateSendTime = stateTransferCreateSendTime;
+        result.delay = delay;
         return result;
     }
 }
Index: catalina/ha/session/mbeans-descriptors.xml
===================================================================
--- catalina/ha/session/mbeans-descriptors.xml    (revision 719433)
+++ catalina/ha/session/mbeans-descriptors.xml    (working copy)
@@ -268,7 +268,7 @@
     <attribute
       name="expireSessionsOnShutdown"
       is="true"
-      description="exipre all sessions cluster wide as one node goes down"
+      description="expire all sessions cluster wide as one node goes down"
       type="boolean"/>
     <attribute
       name="notifyListenersOnReplication"
@@ -293,6 +293,10 @@
       name="sendAllSessionsWaitTime"
       description="wait time between send session block (default 2 sec)"
       type="int"/>
+    <attribute
+      name="delay"
+      description="wait until local applications have initialized before
processing cluster messages"
+      type="boolean"/>
     <operation
       name="listSessionIds"
       description="Return the list of active session ids"
Index: catalina/startup/Catalina.java
===================================================================
--- catalina/startup/Catalina.java    (revision 719433)
+++ catalina/startup/Catalina.java    (working copy)
@@ -28,11 +28,18 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;

 import org.apache.catalina.Container;
+import org.apache.catalina.Globals;
 import org.apache.catalina.Lifecycle;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.Server;
+import org.apache.catalina.Service;
+import org.apache.catalina.core.ContainerBase;
+import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.core.StandardServer;
 import org.apache.tomcat.util.digester.Digester;
 import org.apache.tomcat.util.digester.Rule;
@@ -56,6 +63,7 @@
  *
  * @author Craig R. McClanahan
  * @author Remy Maucherat
+ * @author Jason Lunn
  * @version $Revision$ $Date$
  */

@@ -579,7 +587,18 @@
                 log.error("Catalina.start: ", e);
             }
         }
-
+
+        // Open any gates stored in ServletContexts to allow the processing
of
+        // cluster messages once local applications have been initialized
+        Service [] services = server.findServices();
+        if ( services != null ) {
+            for ( Service service : services ) {
+                if ( service != null ) {
+                    openContainerGates( service.getContainer() );
+                }
+            }
+        }
+
         long t2 = System.nanoTime();
         if(log.isInfoEnabled())
             log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
@@ -601,7 +620,6 @@
             await();
             stop();
         }
-
     }


@@ -609,7 +627,6 @@
      * Stop an existing server instance.
      */
     public void stop() {
-
         try {
             // Remove the ShutdownHook first so that server.stop()
             // doesn't get invoked twice
@@ -629,7 +646,6 @@
                 log.error("Catalina.stop", e);
             }
         }
-
     }


@@ -637,9 +653,7 @@
      * Await and shutdown.
      */
     public void await() {
-
         server.await();
-
     }


@@ -647,15 +661,50 @@
      * Print usage information for this application.
      */
     protected void usage() {
-
         System.out.println
             ("usage: java org.apache.catalina.startup.Catalina"
              + " [ -config {pathname} ]"
              + " [ -nonaming ] { start | stop }");
+    }

+
+    /**
+     * Traverses the argument container and decrements the CountDownLatch
+     * found in the servlet context with attribute key
Globals.CLUSTER_DELAY
+     * if found
+     * @param container Possibly null Container instance
+     */
+    protected void openContainerGates ( Container container ) {
+        if (container == null) {
+            return;
+        }
+
+        if ( container instanceof StandardContext ) {
+            StandardContext context =
+                (StandardContext)container;
+            ServletContext servletContext =
+                context.getServletContext();
+            if (servletContext != null) {
+                Object contextAttribute = servletContext
+                        .getAttribute(Globals.CLUSTER_DELAY);
+                if (contextAttribute != null &&
+                        contextAttribute instanceof CountDownLatch) {
+                    CountDownLatch gate =
+                        (CountDownLatch) contextAttribute;
+                    gate.countDown();
+                }
+            }
+        } else if ( container instanceof ContainerBase ) {
+            ContainerBase base = (ContainerBase)container;
+            Container [] containers = base.findChildren();
+            if ( containers != null ) {
+                for ( Container childContainer : containers ) {
+                    openContainerGates( childContainer );
+                }
+            }
+        }
     }

-
     // --------------------------------------- CatalinaShutdownHook Inner
Class

     // XXX Should be moved to embedded !
@@ -709,6 +758,4 @@
         top.setParentClassLoader(parentClassLoader);

     }
-
-
 }



On Fri, Nov 21, 2008 at 10:42 PM, Peter Rossbach <[EMAIL PROTECTED]> wrote:

> Hi Jason,
>
> send us your implementation and let us review your stuff :-)
>
> You can also register a ContextListener at DeltaManager.setContainer() to
> control your latch.
> Are your sure that session sync message (GET ALL Session) is received
> before first request at second node
> is processed?
>
> I think your feature is an extension of the current reveivedQueue usage!
>
> Regards
> Peter
>
>
>
> Am 20.11.2008 um 22:54 schrieb Jason:
>
>
>  This message is targeted at Filip Hanik, Craig R. McClanahan,
>> Jean-Francois
>> Arcand, Peter Rossbach or anyone with a direct interest in the
>> DeltaManager
>> implementation in Tomcat 6.
>>
>> A vendor (who will remain nameless) whose product I support for a client
>> recently gave me an idea for a patch to DeltaManager to address what the
>> vendor claims is a Tomcat specific issue related to session replication.
>> I'm
>> wondering if it would be of value to the community or if the "problem" it
>> is
>> trying to remedy is an intentional "feature".
>>
>> The primary issue is that, according to vendor engineering support, the
>> other application containers the vendor supports deploying their product
>> on,
>> including WebSphere, WebLogic, et al, wait until after local applications
>> have been initialized before processing incoming messages from the cluster
>> that could include deserializing remote sessions and the objects therein.
>> I
>> have not confirmed this by examining the other containers mind you, but am
>> pretty confident that this is an accurate statement in so far that
>> vendor's
>> product works in those environments but does not work in a clustered
>> tomcat
>> environment.
>>
>> The reason it fails in tomcat is that some of the objects in the
>> serialized
>> session make calls at construction time to the vendor's (archaic)
>> preferences API's static methods, which are not initialized properly until
>> the web application itself is started. The result is that the first node
>> in
>> the cluster starts up fine, but the 2nd-Nth nodes die a horrible death
>> trying to deserialize remote sessions populated by the first node.
>>
>> The workaround we've implemented locally is a simple one: we extend the
>> DeltaManager with a custom class. Therein, we create a latch
>> (java.util.concurrent.CountDownLatch, to be specific) and save it in the
>> ServletContext. The only overridden method is messageDataReceived(), which
>> uses the latch.await() method to block before calling the original
>> implementation of the parent messageDataReceived() method.
>>
>> The vendor's application (or, more properly, the custom extensions we've
>> built on their platform) looks at the ServletContext for a latch after the
>> preferences have been initialized locally, and calls latch.countDown(),
>> allowing any blocked calls to messageDataReceived() to start executing as
>> normally.
>>
>> Without breaking the current sequence of initializing the session
>> replication code before local applications that Tomcat developers may have
>> come to expect, it seems like there is a potential solution here that
>> might
>> enable applications like the one I've got to support to choose to
>> configure
>> the session replication to wait to process incoming messages until after
>> the
>> application has started.
>>
>> I think it would be pretty trivial for me to offer a patch to DeltaManager
>> that created a latch based on a configuration element. One could imagine
>> an
>> automatic mechanism for toggling the latch by the container after the
>> application initialization, or deferring to the application to deactivate.
>> The question is, does anybody want such functionality besides me? The
>> corollary is, if being able to choose when session replication begins is a
>> desirable feature, is this the right tactic to implement it?
>>
>> Sincerely,
>>
>>  - Jason Lunn
>>
>> "That's the problem. He's a brilliant lunatic and you can't tell
>> which way he'll jump --
>> like his game he's impossible to analyse --
>> you can't dissect him, predict him --
>> which of course means he's not a lunatic at all."
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [EMAIL PROTECTED]
> For additional commands, e-mail: [EMAIL PROTECTED]
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to