Modified: 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/ReplicationValve.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/ReplicationValve.java?view=diff&rev=466608&r1=466607&r2=466608
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/ReplicationValve.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/ReplicationValve.java
 Sat Oct 21 16:10:15 2006
@@ -1,658 +1,659 @@
-/*
- * Copyright 1999,2004-2005 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.ha.tcp;
-
-import java.io.IOException;
-import java.util.StringTokenizer;
-import java.util.regex.Pattern;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-import javax.servlet.ServletException;
-
-import org.apache.catalina.Manager;
-import org.apache.catalina.Session;
-import org.apache.catalina.Context;
-import org.apache.catalina.core.StandardContext;
-import org.apache.catalina.ha.CatalinaCluster;
-import org.apache.catalina.ha.ClusterManager;
-import org.apache.catalina.ha.ClusterMessage;
-import org.apache.catalina.ha.ClusterSession;
-import org.apache.catalina.ha.ClusterValve;
-import org.apache.catalina.ha.session.DeltaManager;
-import org.apache.catalina.ha.session.DeltaSession;
-import org.apache.catalina.connector.Request;
-import org.apache.catalina.connector.Response;
-import org.apache.catalina.util.StringManager;
-import org.apache.catalina.valves.ValveBase;
-
-/**
- * <p>Implementation of a Valve that logs interesting contents from the
- * specified Request (before processing) and the corresponding Response
- * (after processing).  It is especially useful in debugging problems
- * related to headers and cookies.</p>
- *
- * <p>This Valve may be attached to any Container, depending on the granularity
- * of the logging you wish to perform.</p>
- *
- * <p>primaryIndicator=true, then the request attribute 
<i>org.apache.catalina.ha.tcp.isPrimarySession.</i>
- * is set true, when request processing is at sessions primary node.
- * </p>
- *
- * @author Craig R. McClanahan
- * @author Filip Hanik
- * @author Peter Rossbach
- * @version $Revision: 375709 $ $Date: 2006-02-07 15:13:25 -0600 (Tue, 07 Feb 
2006) $
- */
-
-public class ReplicationValve
-    extends ValveBase implements ClusterValve {
-    
-    private static org.apache.commons.logging.Log log =
-        org.apache.commons.logging.LogFactory.getLog( ReplicationValve.class );
-
-    // ----------------------------------------------------- Instance Variables
-
-    /**
-     * The descriptive information related to this implementation.
-     */
-    private static final String info =
-        "org.apache.catalina.ha.tcp.ReplicationValve/2.0";
-
-
-    /**
-     * The StringManager for this package.
-     */
-    protected static StringManager sm =
-        StringManager.getManager(Constants.Package);
-
-    private CatalinaCluster cluster = null ;
-
-    /**
-     * holds file endings to not call for like images and others
-     */
-    protected java.util.regex.Pattern[] reqFilters = new 
java.util.regex.Pattern[0];
-    
-    /**
-     * Orginal filter 
-     */
-    protected String filter ;
-    
-    /**
-     * crossContext session container 
-     */
-    protected ThreadLocal crossContextSessions = new ThreadLocal() ;
-    
-    /**
-     * doProcessingStats (default = off)
-     */
-    protected boolean doProcessingStats = false;
-    
-    protected long totalRequestTime = 0;
-    protected long totalSendTime = 0;
-    protected long nrOfRequests = 0;
-    protected long lastSendTime = 0;
-    protected long nrOfFilterRequests = 0;
-    protected long nrOfSendRequests = 0;
-    protected long nrOfCrossContextSendRequests = 0;
-    
-    /**
-     * must primary change indicator set 
-     */
-    protected boolean primaryIndicator = false ;
-    
-    /**
-     * Name of primary change indicator as request attribute
-     */
-    protected String primaryIndicatorName = 
"org.apache.catalina.ha.tcp.isPrimarySession";
-   
-    // ------------------------------------------------------------- Properties
-
-    public ReplicationValve() {
-    }
-    
-    /**
-     * Return descriptive information about this Valve implementation.
-     */
-    public String getInfo() {
-
-        return (info);
-
-    }
-    
-    /**
-     * @return Returns the cluster.
-     */
-    public CatalinaCluster getCluster() {
-        return cluster;
-    }
-    
-    /**
-     * @param cluster The cluster to set.
-     */
-    public void setCluster(CatalinaCluster cluster) {
-        this.cluster = cluster;
-    }
- 
-    /**
-     * @return Returns the filter
-     */
-    public String getFilter() {
-       return filter ;
-    }
-
-    /**
-     * compile filter string to regular expressions
-     * @see Pattern#compile(java.lang.String)
-     * @param filter
-     *            The filter to set.
-     */
-    public void setFilter(String filter) {
-        if (log.isDebugEnabled())
-            log.debug(sm.getString("ReplicationValve.filter.loading", filter));
-        this.filter = filter;
-        StringTokenizer t = new StringTokenizer(filter, ";");
-        this.reqFilters = new Pattern[t.countTokens()];
-        int i = 0;
-        while (t.hasMoreTokens()) {
-            String s = t.nextToken();
-            if (log.isTraceEnabled())
-                log.trace(sm.getString("ReplicationValve.filter.token", s));
-            try {
-                reqFilters[i++] = Pattern.compile(s);
-            } catch (Exception x) {
-                log.error(sm.getString("ReplicationValve.filter.token.failure",
-                        s), x);
-            }
-        }
-    }
-
-    /**
-     * @return Returns the primaryIndicator.
-     */
-    public boolean isPrimaryIndicator() {
-        return primaryIndicator;
-    }
-
-    /**
-     * @param primaryIndicator The primaryIndicator to set.
-     */
-    public void setPrimaryIndicator(boolean primaryIndicator) {
-        this.primaryIndicator = primaryIndicator;
-    }
-    
-    /**
-     * @return Returns the primaryIndicatorName.
-     */
-    public String getPrimaryIndicatorName() {
-        return primaryIndicatorName;
-    }
-    
-    /**
-     * @param primaryIndicatorName The primaryIndicatorName to set.
-     */
-    public void setPrimaryIndicatorName(String primaryIndicatorName) {
-        this.primaryIndicatorName = primaryIndicatorName;
-    }
-    
-    /**
-     * Calc processing stats
-     */
-    public boolean isDoProcessingStats() {
-        return doProcessingStats;
-    }
-
-    /**
-     * Set Calc processing stats
-     * @see #resetStatistics()
-     */
-    public void setDoProcessingStats(boolean doProcessingStats) {
-        this.doProcessingStats = doProcessingStats;
-    }
-
-    /**
-     * @return Returns the lastSendTime.
-     */
-    public long getLastSendTime() {
-        return lastSendTime;
-    }
-    
-    /**
-     * @return Returns the nrOfRequests.
-     */
-    public long getNrOfRequests() {
-        return nrOfRequests;
-    }
-    
-    /**
-     * @return Returns the nrOfFilterRequests.
-     */
-    public long getNrOfFilterRequests() {
-        return nrOfFilterRequests;
-    }
-
-    /**
-     * @return Returns the nrOfCrossContextSendRequests.
-     */
-    public long getNrOfCrossContextSendRequests() {
-        return nrOfCrossContextSendRequests;
-    }
-
-    /**
-     * @return Returns the nrOfSendRequests.
-     */
-    public long getNrOfSendRequests() {
-        return nrOfSendRequests;
-    }
-
-    /**
-     * @return Returns the totalRequestTime.
-     */
-    public long getTotalRequestTime() {
-        return totalRequestTime;
-    }
-    
-    /**
-     * @return Returns the totalSendTime.
-     */
-    public long getTotalSendTime() {
-        return totalSendTime;
-    }
-
-    /**
-     * @return Returns the reqFilters.
-     */
-    protected java.util.regex.Pattern[] getReqFilters() {
-        return reqFilters;
-    }
-    
-    /**
-     * @param reqFilters The reqFilters to set.
-     */
-    protected void setReqFilters(java.util.regex.Pattern[] reqFilters) {
-        this.reqFilters = reqFilters;
-    }
-    
-    
-    // --------------------------------------------------------- Public Methods
-    
-    /**
-     * Register all cross context sessions inside endAccess.
-     * Use a list with contains check, that the Portlet API can include a lot 
of fragments from same or
-     * different applications with session changes.
-     *
-     * @param session cross context session
-     */
-    public void registerReplicationSession(DeltaSession session) {
-        List sessions = (List)crossContextSessions.get();
-        if(sessions != null) {
-            if(!sessions.contains(session)) {
-                if(log.isDebugEnabled())
-                    
log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
-                        session.getIdInternal(),
-                        session.getManager().getContainer().getName()));
-                sessions.add(session);
-            }
-        }
-    }
-
-    /**
-     * Log the interesting request parameters, invoke the next Valve in the
-     * sequence, and log the interesting response parameters.
-     *
-     * @param request The servlet request to be processed
-     * @param response The servlet response to be created
-     *
-     * @exception IOException if an input/output error occurs
-     * @exception ServletException if a servlet error occurs
-     */
-    public void invoke(Request request, Response response)
-        throws IOException, ServletException
-    {
-        long totalstart = 0;
-
-        //this happens before the request
-        if(isDoProcessingStats()) {
-            totalstart = System.currentTimeMillis();
-        }
-        if (primaryIndicator) {
-            createPrimaryIndicator(request) ;
-        }
-        Context context = request.getContext();
-        boolean isCrossContext = context != null
-                && context instanceof StandardContext
-                && ((StandardContext) context).getCrossContext();
-        try {
-            if(isCrossContext) {
-                if(log.isDebugEnabled())
-                    
log.debug(sm.getString("ReplicationValve.crossContext.add"));
-                //FIXME add Pool of Arraylists
-                crossContextSessions.set(new ArrayList());
-            }
-            getNext().invoke(request, response);
-            Manager manager = request.getContext().getManager();
-            if (manager != null && manager instanceof ClusterManager) {
-                ClusterManager clusterManager = (ClusterManager) manager;
-                CatalinaCluster containerCluster = (CatalinaCluster) 
getContainer().getCluster();
-                if (containerCluster == null) {
-                    if (log.isWarnEnabled())
-                        log.warn(sm.getString("ReplicationValve.nocluster"));
-                    return;
-                }
-                // valve cluster can access manager - other cluster handle 
replication 
-                // at host level - hopefully!
-                if(containerCluster.getManager(clusterManager.getName()) == 
null)
-                    return ;
-                if(containerCluster.hasMembers()) {
-                    sendReplicationMessage(request, totalstart, 
isCrossContext, clusterManager, containerCluster);
-                } else {
-                    resetReplicationRequest(request,isCrossContext);
-                }        
-            }
-        } finally {
-            // Array must be remove: Current master request send endAccess at 
recycle. 
-            // Don't register this request session again!
-            if(isCrossContext) {
-                if(log.isDebugEnabled())
-                    
log.debug(sm.getString("ReplicationValve.crossContext.remove"));
-                // crossContextSessions.remove() only exist at Java 5
-                // register ArrayList at a pool
-                crossContextSessions.set(null);
-            }
-        }
-    }
-
-    
-    /**
-     * reset the active statitics 
-     */
-    public void resetStatistics() {
-        totalRequestTime = 0 ;
-        totalSendTime = 0 ;
-        lastSendTime = 0 ;
-        nrOfFilterRequests = 0 ;
-        nrOfRequests = 0 ;
-        nrOfSendRequests = 0;
-        nrOfCrossContextSendRequests = 0;
-    }
-    
-    /**
-     * Return a String rendering of this object.
-     */
-    public String toString() {
-
-        StringBuffer sb = new StringBuffer("ReplicationValve[");
-        if (container != null)
-            sb.append(container.getName());
-        sb.append("]");
-        return (sb.toString());
-
-    }
-
-    // --------------------------------------------------------- Protected 
Methods
-
-    /**
-     * @param request
-     * @param totalstart
-     * @param isCrossContext
-     * @param clusterManager
-     * @param containerCluster
-     */
-    protected void sendReplicationMessage(Request request, long totalstart, 
boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster 
containerCluster) {
-        //this happens after the request
-        long start = 0;
-        if(isDoProcessingStats()) {
-            start = System.currentTimeMillis();
-        }
-        try {
-            // send invalid sessions
-            // DeltaManager returns String[0]
-            if (!(clusterManager instanceof DeltaManager))
-                sendInvalidSessions(clusterManager, containerCluster);
-            // send replication
-            sendSessionReplicationMessage(request, clusterManager, 
containerCluster);
-            if(isCrossContext)
-                sendCrossContextSession(containerCluster);
-        } catch (Exception x) {
-            // FIXME we have a lot of sends, but the trouble with one node 
stops the correct replication to other nodes!
-            log.error(sm.getString("ReplicationValve.send.failure"), x);
-        } finally {
-            // FIXME this stats update are not cheap!!
-            if(isDoProcessingStats()) {
-                updateStats(totalstart,start);
-            }
-        }
-    }
-
-    /**
-     * Send all changed cross context sessions to backups
-     * @param containerCluster
-     */
-    protected void sendCrossContextSession(CatalinaCluster containerCluster) {
-        Object sessions = crossContextSessions.get();
-        if(sessions != null && sessions instanceof List
-                && ((List)sessions).size() >0) {
-            for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) 
{          
-                Session session = (Session)iter.next();
-                if(log.isDebugEnabled())
-                    
log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",  
-                            session.getManager().getContainer().getName() ));
-                
sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
-                if(isDoProcessingStats()) {
-                    nrOfCrossContextSendRequests++;
-                }
-            }
-        }
-    }
-  
-    /**
-     * Fix memory leak for long sessions with many changes, when no backup 
member exists!
-     * @param request current request after responce is generated
-     * @param isCrossContext check crosscontext threadlocal
-     */
-    protected void resetReplicationRequest(Request request, boolean 
isCrossContext) {
-        Session contextSession = request.getSessionInternal(false);
-        if(contextSession != null & contextSession instanceof DeltaSession){
-            resetDeltaRequest(contextSession);
-            ((DeltaSession)contextSession).setPrimarySession(true);
-        }
-        if(isCrossContext) {
-            Object sessions = crossContextSessions.get();
-            if(sessions != null && sessions instanceof List
-               && ((List)sessions).size() >0) {
-                Iterator iter = ((List)sessions).iterator();
-                for(; iter.hasNext() ;) {          
-                    Session session = (Session)iter.next();
-                    resetDeltaRequest(session);
-                    if(session instanceof DeltaSession)
-                        ((DeltaSession)contextSession).setPrimarySession(true);
-
-                }
-            }
-        }                     
-    }
-
-    /**
-     * Reset DeltaRequest from session
-     * @param session HttpSession from current request or cross context session
-     */
-    protected void resetDeltaRequest(Session session) {
-        if(log.isDebugEnabled()) {
-            log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , 
-                session.getManager().getContainer().getName() ));
-        }
-        ((DeltaSession)session).resetDeltaRequest();
-    }
-
-    /**
-     * Send Cluster Replication Request
-     * @param request current request
-     * @param manager session manager
-     * @param cluster replication cluster
-     */
-    protected void sendSessionReplicationMessage(Request request,
-            ClusterManager manager, CatalinaCluster cluster) {
-        Session session = request.getSessionInternal(false);
-        if (session != null) {
-            String uri = request.getDecodedRequestURI();
-            // request without session change
-            if (!isRequestWithoutSessionChange(uri)) {
-                if (log.isDebugEnabled())
-                    log.debug(sm.getString("ReplicationValve.invoke.uri", 
uri));
-                sendMessage(session,manager,cluster);
-            } else
-                if(isDoProcessingStats())
-                    nrOfFilterRequests++;
-        }
-
-    }
-
-   /**
-    * Send message delta message from request session 
-    * @param request current request
-    * @param manager session manager
-    * @param cluster replication cluster
-    */
-    protected void sendMessage(Session session,
-             ClusterManager manager, CatalinaCluster cluster) {
-        String id = session.getIdInternal();
-        if (id != null) {
-            send(manager, cluster, id);
-        }
-    }
-
-    /**
-     * send manager requestCompleted message to cluster
-     * @param manager SessionManager
-     * @param cluster replication cluster
-     * @param sessionId sessionid from the manager
-     * @see DeltaManager#requestCompleted(String)
-     * @see SimpleTcpCluster#send(ClusterMessage)
-     */
-    protected void send(ClusterManager manager, CatalinaCluster cluster, 
String sessionId) {
-        ClusterMessage msg = manager.requestCompleted(sessionId);
-        if (msg != null) {
-            if(manager.isSendClusterDomainOnly()) {
-                cluster.sendClusterDomain(msg);
-            } else {
-                cluster.send(msg);
-            }
-            if(isDoProcessingStats())
-                nrOfSendRequests++;
-        }
-    }
-    
-    /**
-     * check for session invalidations
-     * @param manager
-     * @param cluster
-     */
-    protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster 
cluster) {
-        String[] invalidIds=manager.getInvalidatedSessions();
-        if ( invalidIds.length > 0 ) {
-            for ( int i=0;i<invalidIds.length; i++ ) {
-                try {
-                    send(manager,cluster,invalidIds[i]);
-                } catch ( Exception x ) {
-                    
log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
-                }
-            }
-        }
-    }
-    
-    /**
-     * is request without possible session change
-     * @param uri The request uri
-     * @return True if no session change
-     */
-    protected boolean isRequestWithoutSessionChange(String uri) {
-
-        boolean filterfound = false;
-
-        for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
-            java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);
-            filterfound = matcher.matches();
-        }
-        return filterfound;
-    }
-
-    /**
-     * protocol cluster replications stats
-     * @param requestTime
-     * @param clusterTime
-     */
-    protected  void updateStats(long requestTime, long clusterTime) {
-        synchronized(this) {
-            lastSendTime=System.currentTimeMillis();
-            totalSendTime+=lastSendTime - clusterTime;
-            totalRequestTime+=lastSendTime - requestTime;
-            nrOfRequests++;
-        }
-        if(log.isInfoEnabled()) {
-            if ( (nrOfRequests % 100) == 0 ) {
-                 log.info(sm.getString("ReplicationValve.stats",
-                     new Object[]{
-                         new Long(totalRequestTime/nrOfRequests),
-                         new Long(totalSendTime/nrOfRequests),
-                         new Long(nrOfRequests),
-                         new Long(nrOfSendRequests),
-                         new Long(nrOfCrossContextSendRequests),
-                         new Long(nrOfFilterRequests),
-                         new Long(totalRequestTime),
-                         new Long(totalSendTime)}));
-             }
-        }
-    }
-
-
-    /**
-     * Mark Request that processed at primary node with attribute
-     * primaryIndicatorName
-     * 
-     * @param request
-     * @throws IOException
-     */
-    protected void createPrimaryIndicator(Request request) throws IOException {
-        String id = request.getRequestedSessionId();
-        if ((id != null) && (id.length() > 0)) {
-            Manager manager = request.getContext().getManager();
-            Session session = manager.findSession(id);
-            if (session instanceof ClusterSession) {
-                ClusterSession cses = (ClusterSession) session;
-                if (cses != null) {
-                    Boolean isPrimary = new Boolean(cses.isPrimarySession());
-                    if (log.isDebugEnabled())
-                        log.debug(sm.getString(
-                                "ReplicationValve.session.indicator", 
request.getContext().getName(),id,
-                                primaryIndicatorName, isPrimary));
-                    request.setAttribute(primaryIndicatorName, isPrimary);
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    if (session != null) {
-                        log.debug(sm.getString(
-                                "ReplicationValve.session.found", 
request.getContext().getName(),id));
-                    } else {
-                        log.debug(sm.getString(
-                                "ReplicationValve.session.invalid", 
request.getContext().getName(),id));
-                    }
-                }
-            }
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.ha.tcp;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+import java.util.regex.Pattern;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import javax.servlet.ServletException;
+
+import org.apache.catalina.Manager;
+import org.apache.catalina.Session;
+import org.apache.catalina.Context;
+import org.apache.catalina.core.StandardContext;
+import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
+import org.apache.catalina.ha.ClusterMessage;
+import org.apache.catalina.ha.ClusterSession;
+import org.apache.catalina.ha.ClusterValve;
+import org.apache.catalina.ha.session.DeltaManager;
+import org.apache.catalina.ha.session.DeltaSession;
+import org.apache.catalina.connector.Request;
+import org.apache.catalina.connector.Response;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.valves.ValveBase;
+
+/**
+ * <p>Implementation of a Valve that logs interesting contents from the
+ * specified Request (before processing) and the corresponding Response
+ * (after processing).  It is especially useful in debugging problems
+ * related to headers and cookies.</p>
+ *
+ * <p>This Valve may be attached to any Container, depending on the granularity
+ * of the logging you wish to perform.</p>
+ *
+ * <p>primaryIndicator=true, then the request attribute 
<i>org.apache.catalina.ha.tcp.isPrimarySession.</i>
+ * is set true, when request processing is at sessions primary node.
+ * </p>
+ *
+ * @author Craig R. McClanahan
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 375709 $ $Date: 2006-02-07 15:13:25 -0600 (Tue, 07 Feb 
2006) $
+ */
+
+public class ReplicationValve
+    extends ValveBase implements ClusterValve {
+    
+    private static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( ReplicationValve.class );
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * The descriptive information related to this implementation.
+     */
+    private static final String info =
+        "org.apache.catalina.ha.tcp.ReplicationValve/2.0";
+
+
+    /**
+     * The StringManager for this package.
+     */
+    protected static StringManager sm =
+        StringManager.getManager(Constants.Package);
+
+    private CatalinaCluster cluster = null ;
+
+    /**
+     * holds file endings to not call for like images and others
+     */
+    protected java.util.regex.Pattern[] reqFilters = new 
java.util.regex.Pattern[0];
+    
+    /**
+     * Orginal filter 
+     */
+    protected String filter ;
+    
+    /**
+     * crossContext session container 
+     */
+    protected ThreadLocal crossContextSessions = new ThreadLocal() ;
+    
+    /**
+     * doProcessingStats (default = off)
+     */
+    protected boolean doProcessingStats = false;
+    
+    protected long totalRequestTime = 0;
+    protected long totalSendTime = 0;
+    protected long nrOfRequests = 0;
+    protected long lastSendTime = 0;
+    protected long nrOfFilterRequests = 0;
+    protected long nrOfSendRequests = 0;
+    protected long nrOfCrossContextSendRequests = 0;
+    
+    /**
+     * must primary change indicator set 
+     */
+    protected boolean primaryIndicator = false ;
+    
+    /**
+     * Name of primary change indicator as request attribute
+     */
+    protected String primaryIndicatorName = 
"org.apache.catalina.ha.tcp.isPrimarySession";
+   
+    // ------------------------------------------------------------- Properties
+
+    public ReplicationValve() {
+    }
+    
+    /**
+     * Return descriptive information about this Valve implementation.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+    
+    /**
+     * @return Returns the cluster.
+     */
+    public CatalinaCluster getCluster() {
+        return cluster;
+    }
+    
+    /**
+     * @param cluster The cluster to set.
+     */
+    public void setCluster(CatalinaCluster cluster) {
+        this.cluster = cluster;
+    }
+ 
+    /**
+     * @return Returns the filter
+     */
+    public String getFilter() {
+       return filter ;
+    }
+
+    /**
+     * compile filter string to regular expressions
+     * @see Pattern#compile(java.lang.String)
+     * @param filter
+     *            The filter to set.
+     */
+    public void setFilter(String filter) {
+        if (log.isDebugEnabled())
+            log.debug(sm.getString("ReplicationValve.filter.loading", filter));
+        this.filter = filter;
+        StringTokenizer t = new StringTokenizer(filter, ";");
+        this.reqFilters = new Pattern[t.countTokens()];
+        int i = 0;
+        while (t.hasMoreTokens()) {
+            String s = t.nextToken();
+            if (log.isTraceEnabled())
+                log.trace(sm.getString("ReplicationValve.filter.token", s));
+            try {
+                reqFilters[i++] = Pattern.compile(s);
+            } catch (Exception x) {
+                log.error(sm.getString("ReplicationValve.filter.token.failure",
+                        s), x);
+            }
+        }
+    }
+
+    /**
+     * @return Returns the primaryIndicator.
+     */
+    public boolean isPrimaryIndicator() {
+        return primaryIndicator;
+    }
+
+    /**
+     * @param primaryIndicator The primaryIndicator to set.
+     */
+    public void setPrimaryIndicator(boolean primaryIndicator) {
+        this.primaryIndicator = primaryIndicator;
+    }
+    
+    /**
+     * @return Returns the primaryIndicatorName.
+     */
+    public String getPrimaryIndicatorName() {
+        return primaryIndicatorName;
+    }
+    
+    /**
+     * @param primaryIndicatorName The primaryIndicatorName to set.
+     */
+    public void setPrimaryIndicatorName(String primaryIndicatorName) {
+        this.primaryIndicatorName = primaryIndicatorName;
+    }
+    
+    /**
+     * Calc processing stats
+     */
+    public boolean isDoProcessingStats() {
+        return doProcessingStats;
+    }
+
+    /**
+     * Set Calc processing stats
+     * @see #resetStatistics()
+     */
+    public void setDoProcessingStats(boolean doProcessingStats) {
+        this.doProcessingStats = doProcessingStats;
+    }
+
+    /**
+     * @return Returns the lastSendTime.
+     */
+    public long getLastSendTime() {
+        return lastSendTime;
+    }
+    
+    /**
+     * @return Returns the nrOfRequests.
+     */
+    public long getNrOfRequests() {
+        return nrOfRequests;
+    }
+    
+    /**
+     * @return Returns the nrOfFilterRequests.
+     */
+    public long getNrOfFilterRequests() {
+        return nrOfFilterRequests;
+    }
+
+    /**
+     * @return Returns the nrOfCrossContextSendRequests.
+     */
+    public long getNrOfCrossContextSendRequests() {
+        return nrOfCrossContextSendRequests;
+    }
+
+    /**
+     * @return Returns the nrOfSendRequests.
+     */
+    public long getNrOfSendRequests() {
+        return nrOfSendRequests;
+    }
+
+    /**
+     * @return Returns the totalRequestTime.
+     */
+    public long getTotalRequestTime() {
+        return totalRequestTime;
+    }
+    
+    /**
+     * @return Returns the totalSendTime.
+     */
+    public long getTotalSendTime() {
+        return totalSendTime;
+    }
+
+    /**
+     * @return Returns the reqFilters.
+     */
+    protected java.util.regex.Pattern[] getReqFilters() {
+        return reqFilters;
+    }
+    
+    /**
+     * @param reqFilters The reqFilters to set.
+     */
+    protected void setReqFilters(java.util.regex.Pattern[] reqFilters) {
+        this.reqFilters = reqFilters;
+    }
+    
+    
+    // --------------------------------------------------------- Public Methods
+    
+    /**
+     * Register all cross context sessions inside endAccess.
+     * Use a list with contains check, that the Portlet API can include a lot 
of fragments from same or
+     * different applications with session changes.
+     *
+     * @param session cross context session
+     */
+    public void registerReplicationSession(DeltaSession session) {
+        List sessions = (List)crossContextSessions.get();
+        if(sessions != null) {
+            if(!sessions.contains(session)) {
+                if(log.isDebugEnabled())
+                    
log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
+                        session.getIdInternal(),
+                        session.getManager().getContainer().getName()));
+                sessions.add(session);
+            }
+        }
+    }
+
+    /**
+     * Log the interesting request parameters, invoke the next Valve in the
+     * sequence, and log the interesting response parameters.
+     *
+     * @param request The servlet request to be processed
+     * @param response The servlet response to be created
+     *
+     * @exception IOException if an input/output error occurs
+     * @exception ServletException if a servlet error occurs
+     */
+    public void invoke(Request request, Response response)
+        throws IOException, ServletException
+    {
+        long totalstart = 0;
+
+        //this happens before the request
+        if(isDoProcessingStats()) {
+            totalstart = System.currentTimeMillis();
+        }
+        if (primaryIndicator) {
+            createPrimaryIndicator(request) ;
+        }
+        Context context = request.getContext();
+        boolean isCrossContext = context != null
+                && context instanceof StandardContext
+                && ((StandardContext) context).getCrossContext();
+        try {
+            if(isCrossContext) {
+                if(log.isDebugEnabled())
+                    
log.debug(sm.getString("ReplicationValve.crossContext.add"));
+                //FIXME add Pool of Arraylists
+                crossContextSessions.set(new ArrayList());
+            }
+            getNext().invoke(request, response);
+            Manager manager = request.getContext().getManager();
+            if (manager != null && manager instanceof ClusterManager) {
+                ClusterManager clusterManager = (ClusterManager) manager;
+                CatalinaCluster containerCluster = (CatalinaCluster) 
getContainer().getCluster();
+                if (containerCluster == null) {
+                    if (log.isWarnEnabled())
+                        log.warn(sm.getString("ReplicationValve.nocluster"));
+                    return;
+                }
+                // valve cluster can access manager - other cluster handle 
replication 
+                // at host level - hopefully!
+                if(containerCluster.getManager(clusterManager.getName()) == 
null)
+                    return ;
+                if(containerCluster.hasMembers()) {
+                    sendReplicationMessage(request, totalstart, 
isCrossContext, clusterManager, containerCluster);
+                } else {
+                    resetReplicationRequest(request,isCrossContext);
+                }        
+            }
+        } finally {
+            // Array must be remove: Current master request send endAccess at 
recycle. 
+            // Don't register this request session again!
+            if(isCrossContext) {
+                if(log.isDebugEnabled())
+                    
log.debug(sm.getString("ReplicationValve.crossContext.remove"));
+                // crossContextSessions.remove() only exist at Java 5
+                // register ArrayList at a pool
+                crossContextSessions.set(null);
+            }
+        }
+    }
+
+    
+    /**
+     * reset the active statitics 
+     */
+    public void resetStatistics() {
+        totalRequestTime = 0 ;
+        totalSendTime = 0 ;
+        lastSendTime = 0 ;
+        nrOfFilterRequests = 0 ;
+        nrOfRequests = 0 ;
+        nrOfSendRequests = 0;
+        nrOfCrossContextSendRequests = 0;
+    }
+    
+    /**
+     * Return a String rendering of this object.
+     */
+    public String toString() {
+
+        StringBuffer sb = new StringBuffer("ReplicationValve[");
+        if (container != null)
+            sb.append(container.getName());
+        sb.append("]");
+        return (sb.toString());
+
+    }
+
+    // --------------------------------------------------------- Protected 
Methods
+
+    /**
+     * @param request
+     * @param totalstart
+     * @param isCrossContext
+     * @param clusterManager
+     * @param containerCluster
+     */
+    protected void sendReplicationMessage(Request request, long totalstart, 
boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster 
containerCluster) {
+        //this happens after the request
+        long start = 0;
+        if(isDoProcessingStats()) {
+            start = System.currentTimeMillis();
+        }
+        try {
+            // send invalid sessions
+            // DeltaManager returns String[0]
+            if (!(clusterManager instanceof DeltaManager))
+                sendInvalidSessions(clusterManager, containerCluster);
+            // send replication
+            sendSessionReplicationMessage(request, clusterManager, 
containerCluster);
+            if(isCrossContext)
+                sendCrossContextSession(containerCluster);
+        } catch (Exception x) {
+            // FIXME we have a lot of sends, but the trouble with one node 
stops the correct replication to other nodes!
+            log.error(sm.getString("ReplicationValve.send.failure"), x);
+        } finally {
+            // FIXME this stats update are not cheap!!
+            if(isDoProcessingStats()) {
+                updateStats(totalstart,start);
+            }
+        }
+    }
+
+    /**
+     * Send all changed cross context sessions to backups
+     * @param containerCluster
+     */
+    protected void sendCrossContextSession(CatalinaCluster containerCluster) {
+        Object sessions = crossContextSessions.get();
+        if(sessions != null && sessions instanceof List
+                && ((List)sessions).size() >0) {
+            for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) 
{          
+                Session session = (Session)iter.next();
+                if(log.isDebugEnabled())
+                    
log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",  
+                            session.getManager().getContainer().getName() ));
+                
sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
+                if(isDoProcessingStats()) {
+                    nrOfCrossContextSendRequests++;
+                }
+            }
+        }
+    }
+  
+    /**
+     * Fix memory leak for long sessions with many changes, when no backup 
member exists!
+     * @param request current request after responce is generated
+     * @param isCrossContext check crosscontext threadlocal
+     */
+    protected void resetReplicationRequest(Request request, boolean 
isCrossContext) {
+        Session contextSession = request.getSessionInternal(false);
+        if(contextSession != null & contextSession instanceof DeltaSession){
+            resetDeltaRequest(contextSession);
+            ((DeltaSession)contextSession).setPrimarySession(true);
+        }
+        if(isCrossContext) {
+            Object sessions = crossContextSessions.get();
+            if(sessions != null && sessions instanceof List
+               && ((List)sessions).size() >0) {
+                Iterator iter = ((List)sessions).iterator();
+                for(; iter.hasNext() ;) {          
+                    Session session = (Session)iter.next();
+                    resetDeltaRequest(session);
+                    if(session instanceof DeltaSession)
+                        ((DeltaSession)contextSession).setPrimarySession(true);
+
+                }
+            }
+        }                     
+    }
+
+    /**
+     * Reset DeltaRequest from session
+     * @param session HttpSession from current request or cross context session
+     */
+    protected void resetDeltaRequest(Session session) {
+        if(log.isDebugEnabled()) {
+            log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , 
+                session.getManager().getContainer().getName() ));
+        }
+        ((DeltaSession)session).resetDeltaRequest();
+    }
+
+    /**
+     * Send Cluster Replication Request
+     * @param request current request
+     * @param manager session manager
+     * @param cluster replication cluster
+     */
+    protected void sendSessionReplicationMessage(Request request,
+            ClusterManager manager, CatalinaCluster cluster) {
+        Session session = request.getSessionInternal(false);
+        if (session != null) {
+            String uri = request.getDecodedRequestURI();
+            // request without session change
+            if (!isRequestWithoutSessionChange(uri)) {
+                if (log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.invoke.uri", 
uri));
+                sendMessage(session,manager,cluster);
+            } else
+                if(isDoProcessingStats())
+                    nrOfFilterRequests++;
+        }
+
+    }
+
+   /**
+    * Send message delta message from request session 
+    * @param request current request
+    * @param manager session manager
+    * @param cluster replication cluster
+    */
+    protected void sendMessage(Session session,
+             ClusterManager manager, CatalinaCluster cluster) {
+        String id = session.getIdInternal();
+        if (id != null) {
+            send(manager, cluster, id);
+        }
+    }
+
+    /**
+     * send manager requestCompleted message to cluster
+     * @param manager SessionManager
+     * @param cluster replication cluster
+     * @param sessionId sessionid from the manager
+     * @see DeltaManager#requestCompleted(String)
+     * @see SimpleTcpCluster#send(ClusterMessage)
+     */
+    protected void send(ClusterManager manager, CatalinaCluster cluster, 
String sessionId) {
+        ClusterMessage msg = manager.requestCompleted(sessionId);
+        if (msg != null) {
+            if(manager.isSendClusterDomainOnly()) {
+                cluster.sendClusterDomain(msg);
+            } else {
+                cluster.send(msg);
+            }
+            if(isDoProcessingStats())
+                nrOfSendRequests++;
+        }
+    }
+    
+    /**
+     * check for session invalidations
+     * @param manager
+     * @param cluster
+     */
+    protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster 
cluster) {
+        String[] invalidIds=manager.getInvalidatedSessions();
+        if ( invalidIds.length > 0 ) {
+            for ( int i=0;i<invalidIds.length; i++ ) {
+                try {
+                    send(manager,cluster,invalidIds[i]);
+                } catch ( Exception x ) {
+                    
log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
+                }
+            }
+        }
+    }
+    
+    /**
+     * is request without possible session change
+     * @param uri The request uri
+     * @return True if no session change
+     */
+    protected boolean isRequestWithoutSessionChange(String uri) {
+
+        boolean filterfound = false;
+
+        for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
+            java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);
+            filterfound = matcher.matches();
+        }
+        return filterfound;
+    }
+
+    /**
+     * protocol cluster replications stats
+     * @param requestTime
+     * @param clusterTime
+     */
+    protected  void updateStats(long requestTime, long clusterTime) {
+        synchronized(this) {
+            lastSendTime=System.currentTimeMillis();
+            totalSendTime+=lastSendTime - clusterTime;
+            totalRequestTime+=lastSendTime - requestTime;
+            nrOfRequests++;
+        }
+        if(log.isInfoEnabled()) {
+            if ( (nrOfRequests % 100) == 0 ) {
+                 log.info(sm.getString("ReplicationValve.stats",
+                     new Object[]{
+                         new Long(totalRequestTime/nrOfRequests),
+                         new Long(totalSendTime/nrOfRequests),
+                         new Long(nrOfRequests),
+                         new Long(nrOfSendRequests),
+                         new Long(nrOfCrossContextSendRequests),
+                         new Long(nrOfFilterRequests),
+                         new Long(totalRequestTime),
+                         new Long(totalSendTime)}));
+             }
+        }
+    }
+
+
+    /**
+     * Mark Request that processed at primary node with attribute
+     * primaryIndicatorName
+     * 
+     * @param request
+     * @throws IOException
+     */
+    protected void createPrimaryIndicator(Request request) throws IOException {
+        String id = request.getRequestedSessionId();
+        if ((id != null) && (id.length() > 0)) {
+            Manager manager = request.getContext().getManager();
+            Session session = manager.findSession(id);
+            if (session instanceof ClusterSession) {
+                ClusterSession cses = (ClusterSession) session;
+                if (cses != null) {
+                    Boolean isPrimary = new Boolean(cses.isPrimarySession());
+                    if (log.isDebugEnabled())
+                        log.debug(sm.getString(
+                                "ReplicationValve.session.indicator", 
request.getContext().getName(),id,
+                                primaryIndicatorName, isPrimary));
+                    request.setAttribute(primaryIndicatorName, isPrimary);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    if (session != null) {
+                        log.debug(sm.getString(
+                                "ReplicationValve.session.found", 
request.getContext().getName(),id));
+                    } else {
+                        log.debug(sm.getString(
+                                "ReplicationValve.session.invalid", 
request.getContext().getName(),id));
+                    }
+                }
+            }
+        }
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to