Author: pero Date: Thu Jan 5 11:34:04 2006 New Revision: 366255 URL: http://svn.apache.org/viewcvs?rev=366255&view=rev Log: Add support for cross context session replication
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java?rev=366255&r1=366254&r2=366255&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Thu Jan 5 11:34:04 2006 @@ -34,6 +34,7 @@ import org.apache.catalina.Context; import org.apache.catalina.Engine; import org.apache.catalina.Host; +import org.apache.catalina.Valve; import org.apache.catalina.Lifecycle; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleListener; @@ -44,9 +45,11 @@ import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.Member; import org.apache.catalina.session.ManagerBase; +import org.apache.catalina.cluster.tcp.ReplicationValve; import org.apache.catalina.util.CustomObjectInputStream; import org.apache.catalina.util.LifecycleSupport; import org.apache.catalina.util.StringManager; +import org.apache.catalina.core.StandardContext; /** * The DeltaManager manages replicated sessions by only replicating the deltas @@ -86,7 +89,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "DeltaManager/2.0"; + private static final String info = "DeltaManager/2.1"; /** * Has this component been started yet? @@ -105,6 +108,11 @@ private CatalinaCluster cluster = null; /** + * cached replication valve cluster container! + */ + private ReplicationValve replicationValve = null ; + + /** * The lifecycle event support for this component. */ protected LifecycleSupport lifecycle = new LifecycleSupport(this); @@ -120,7 +128,7 @@ private boolean notifySessionListenersOnReplication = true; - private boolean stateTransferred = false ; + private boolean stateTransfered = false ; private int stateTransferTimeout = 60; @@ -364,12 +372,20 @@ this.stateTransferTimeout = timeoutAllSession; } - public boolean getStateTransferred() { - return stateTransferred; + /** + * is session state transfered complete? + * + */ + public boolean getStateTransfered() { + return stateTransfered; } - public void setStateTransferred(boolean stateTransferred) { - this.stateTransferred = stateTransferred; + /** + * set that state ist complete transfered + * @param stateTransfered + */ + public void setStateTransfered(boolean stateTransfered) { + this.stateTransfered = stateTransfered; } /** @@ -439,6 +455,7 @@ } /** + * * @return Returns the sendAllSessions. */ public boolean isSendAllSessions() { @@ -917,7 +934,7 @@ lifecycle.fireLifecycleEvent(START_EVENT, null); // Force initialization of the random number generator - String dummy = generateSessionId(); + generateSessionId(); // Load unloaded sessions, if any try { @@ -995,7 +1012,7 @@ stateTransferCreateSendTime = beforeSendTime ; // request session state counterSend_EVT_GET_ALL_SESSIONS++; - stateTransferred = false ; + stateTransfered = false ; // FIXME This send call block the deploy thread, when sender waitForAck is enabled try { synchronized(receivedMessageQueue) { @@ -1048,6 +1065,35 @@ } /** + * Register cross context session at replication valve thread local + * @param session cross context session + */ + protected void registerSessionAtReplicationValve(DeltaSession session) { + if(replicationValve == null) { + if(container instanceof StandardContext + && ((StandardContext)container).getCrossContext()) { + Cluster cluster = getCluster() ; + if(cluster != null && cluster instanceof CatalinaCluster) { + Valve[] valves = ((CatalinaCluster)cluster).getValves(); + if(valves != null && valves.length > 0) { + for(int i=0; replicationValve == null && i < valves.length ; i++ ){ + if(valves[i] instanceof ReplicationValve) + replicationValve = (ReplicationValve)valves[i] ; + } + + if(replicationValve == null && log.isDebugEnabled()) { + log.debug("no ReplicationValve found for CrossContext Support"); + } + } + } + } + } + if(replicationValve != null) { + replicationValve.registerReplicationSession(session); + } + } + + /** * Find the master of the session state * @return master member of sessions */ @@ -1062,7 +1108,6 @@ mbr = member ; } } else { - // FIXME Why only the first Member? if(mbrs.length != 0 ) mbr = mbrs[0]; } @@ -1092,7 +1137,7 @@ } reqNow = System.currentTimeMillis(); isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout())); - } while ((!getStateTransferred()) && (!isTimeout)); + } while ((!getStateTransfered()) && (!isTimeout)); } else { if(getStateTransferTimeout() == -1) { // wait that state is transfered @@ -1101,11 +1146,11 @@ Thread.sleep(100); } catch (Exception sleep) { } - } while ((!getStateTransferred())); + } while ((!getStateTransfered())); reqNow = System.currentTimeMillis(); } } - if (isTimeout || (!getStateTransferred())) { + if (isTimeout || (!getStateTransfered())) { counterNoStateTransfered++ ; log.error(sm.getString("deltaManager.noSessionState", getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime))); @@ -1156,6 +1201,7 @@ // Require a new random number generator if we are restarted this.random = null; getCluster().removeManager(getName(),this); + replicationValve = null; if (initialized) { destroy(); } @@ -1174,8 +1220,6 @@ // Validate the source of this event if (!(event.getSource() instanceof Context)) return; - Context context = (Context) event.getSource(); - // Process a relevant property change if (event.getPropertyName().equals("sessionTimeout")) { try { @@ -1243,29 +1287,35 @@ DeltaSession session = (DeltaSession) findSession(sessionId); DeltaRequest deltaRequest = session.getDeltaRequest(); SessionMessage msg = null; - if (deltaRequest.getSize() > 0) { - - counterSend_EVT_SESSION_DELTA++; - byte[] data = unloadDeltaRequest(deltaRequest); - msg = new SessionMessageImpl(name, - SessionMessage.EVT_SESSION_DELTA, data, sessionId, - sessionId + "-" + System.currentTimeMillis()); - session.resetDeltaRequest(); - if (log.isDebugEnabled()) { - log.debug(sm.getString( - "deltaManager.createMessage.delta", - getName(), sessionId)); - } - - } else if (!session.isPrimarySession()) { - counterSend_EVT_SESSION_ACCESSED++; - msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, - sessionId + "-" + System.currentTimeMillis()); + boolean isDeltaRequest = false ; + synchronized(deltaRequest) { + isDeltaRequest = deltaRequest.getSize() > 0 ; + if (isDeltaRequest) { + counterSend_EVT_SESSION_DELTA++; + byte[] data = unloadDeltaRequest(deltaRequest); + msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_DELTA, data, sessionId, + sessionId + "-" + System.currentTimeMillis()); + session.resetDeltaRequest(); + } + } + if(!isDeltaRequest) { + if(!session.isPrimarySession()) { + counterSend_EVT_SESSION_ACCESSED++; + msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, + sessionId + "-" + System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(sm.getString( + "deltaManager.createMessage.accessChangePrimary", + getName(), sessionId)); + } + } + } else { // log only outside synch block! if (log.isDebugEnabled()) { log.debug(sm.getString( - "deltaManager.createMessage.accessChangePrimary", - getName(), sessionId)); + "deltaManager.createMessage.delta", + getName(), sessionId)); } } session.setPrimarySession(true); @@ -1493,7 +1543,7 @@ "deltaManager.receiveMessage.transfercomplete", getName(), sender.getHost(), new Integer(sender.getPort()))); stateTransferCreateSendTime = msg.getTimestamp() ; - stateTransferred = true ; + stateTransfered = true ; } /** Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java?rev=366255&r1=366254&r2=366255&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java Thu Jan 5 11:34:04 2006 @@ -42,12 +42,14 @@ import javax.servlet.http.HttpSessionEvent; import javax.servlet.http.HttpSessionListener; +import org.apache.catalina.Container; import org.apache.catalina.Context; import org.apache.catalina.Manager; import org.apache.catalina.Session; import org.apache.catalina.SessionEvent; import org.apache.catalina.SessionListener; import org.apache.catalina.cluster.ClusterSession; +import org.apache.catalina.core.StandardContext; import org.apache.catalina.realm.GenericPrincipal; import org.apache.catalina.util.Enumerator; import org.apache.catalina.util.StringManager; @@ -136,12 +138,6 @@ private long creationTime = 0L; /** - * The debugging detail level for this component. NOTE: This value is not - * included in the serialized version of this object. - */ - private transient int debug = 0; - - /** * We are currently processing a session expiration, so bypass certain * IllegalStateException tests. NOTE: This value is not included in the * serialized version of this object. @@ -162,7 +158,7 @@ /** * Descriptive information describing this Session implementation. */ - private static final String info = "DeltaSession/1.0"; + private static final String info = "DeltaSession/1.1"; /** * The last accessed time for this Session. @@ -426,9 +422,7 @@ } } } - }//end if - //end fix - + } } /** @@ -656,6 +650,8 @@ public void endAccess() { isNew = false; accessCount--; + if(manager instanceof DeltaManager) + ((DeltaManager)manager).registerSessionAtReplicationValve(this); } /** Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?rev=366255&r1=366254&r2=366255&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Thu Jan 5 11:34:04 2006 @@ -35,17 +35,22 @@ ReplicationTransmitter.setProperty=set property {0}: {1} old value {2} ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name {1} ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with name {1} +ReplicationValve.crossContext.add=add Cross Context session replication container to replicationValve threadlocal +ReplicationValve.crossContext.registerSession=register Cross context session id={0} from context {1} +ReplicationValve.crossContext.remove=remove Cross Context session replication container from replicationValve threadlocal +ReplicationValve.crossContext.sendDelta=send Cross Context session delta from context {0}. ReplicationValve.filter.loading=Loading request filters={0} ReplicationValve.filter.token=Request filter={0} ReplicationValve.filter.token.failure=Unable to compile filter={0} ReplicationValve.invoke.uri=Invoking replication request on {0} ReplicationValve.nocluster=No cluster configured for this request. +ReplicationValve.resetDeltaRequest=Cluster is standalone: reset Session Request Delta at context {0} ReplicationValve.send.failure=Unable to perform replication request. ReplicationValve.send.invalid.failure=Unable to send session [id={0}] invalid message over cluster. ReplicationValve.session.found=Context {0}: Found session {1} but it isn't a ClusterSession. ReplicationValve.session.indicator=Context {0}: Primarity of session {0} in request attribute {1} is {2}. ReplicationValve.session.invalid=Context {0}: Requested session {1} is invalid, removed or not replicated at this node. -ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests (Request={4} ms Cluster={5} ms). +ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests {4} send requests {5} cross context requests (Request={6} ms Cluster={7} ms). SimpleTcpCluster.event.log=Cluster receive listener event {0} with data {1} SimpleTcpCluster.getProperty=get property {0} SimpleTcpCluster.setProperty=set property {0}: {1} old value {2} Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java?rev=366255&r1=366254&r2=366255&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java Thu Jan 5 11:34:04 2006 @@ -19,17 +19,22 @@ import java.io.IOException; import java.util.StringTokenizer; import java.util.regex.Pattern; - +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; import javax.servlet.ServletException; import org.apache.catalina.Manager; import org.apache.catalina.Session; +import org.apache.catalina.Context; +import org.apache.catalina.core.StandardContext; import org.apache.catalina.cluster.CatalinaCluster; import org.apache.catalina.cluster.ClusterManager; import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.ClusterSession; import org.apache.catalina.cluster.ClusterValve; import org.apache.catalina.cluster.session.DeltaManager; +import org.apache.catalina.cluster.session.DeltaSession; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.catalina.util.StringManager; @@ -66,7 +71,7 @@ * The descriptive information related to this implementation. */ private static final String info = - "org.apache.catalina.cluster.tcp.ReplicationValve/1.2"; + "org.apache.catalina.cluster.tcp.ReplicationValve/2.0"; /** @@ -81,20 +86,45 @@ * holds file endings to not call for like images and others */ protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0]; + + /** + * Orginal filter + */ protected String filter ; - - protected long totalRequestTime=0; - protected long totalSendTime=0; - protected long nrOfRequests =0; - protected long lastSendTime =0; - protected long nrOfFilterRequests=0; + + /** + * crossContext session container + */ + protected ThreadLocal crossContextSessions = new ThreadLocal() ; + + /** + * doProcessingStats (default = off) + */ + protected boolean doProcessingStats = false; + + protected long totalRequestTime = 0; + protected long totalSendTime = 0; + protected long nrOfRequests = 0; + protected long lastSendTime = 0; + protected long nrOfFilterRequests = 0; + protected long nrOfSendRequests = 0; + protected long nrOfCrossContextSendRequests = 0; + + /** + * must primary change indicator set + */ protected boolean primaryIndicator = false ; + + /** + * Name of primary change indicator as request attributeâ + */ protected String primaryIndicatorName = "org.apache.catalina.cluster.tcp.isPrimarySession"; // ------------------------------------------------------------- Properties public ReplicationValve() { } + /** * Return descriptive information about this Valve implementation. */ @@ -157,25 +187,43 @@ public boolean isPrimaryIndicator() { return primaryIndicator; } + /** * @param primaryIndicator The primaryIndicator to set. */ public void setPrimaryIndicator(boolean primaryIndicator) { this.primaryIndicator = primaryIndicator; } + /** * @return Returns the primaryIndicatorName. */ public String getPrimaryIndicatorName() { return primaryIndicatorName; } + /** * @param primaryIndicatorName The primaryIndicatorName to set. */ public void setPrimaryIndicatorName(String primaryIndicatorName) { this.primaryIndicatorName = primaryIndicatorName; } - + + /** + * Calc processing stats + */ + public boolean isDoProcessingStats() { + return doProcessingStats; + } + + /** + * Set Calc processing stats + * @see #resetStatistics() + */ + public void setDoProcessingStats(boolean doProcessingStats) { + this.doProcessingStats = doProcessingStats; + } + /** * @return Returns the lastSendTime. */ @@ -198,6 +246,20 @@ } /** + * @return Returns the nrOfCrossContextSendRequests. + */ + public long getNrOfCrossContextSendRequests() { + return nrOfCrossContextSendRequests; + } + + /** + * @return Returns the nrOfSendRequests. + */ + public long getNrOfSendRequests() { + return nrOfSendRequests; + } + + /** * @return Returns the totalRequestTime. */ public long getTotalRequestTime() { @@ -217,6 +279,7 @@ protected java.util.regex.Pattern[] getReqFilters() { return reqFilters; } + /** * @param reqFilters The reqFilters to set. */ @@ -224,8 +287,28 @@ this.reqFilters = reqFilters; } + // --------------------------------------------------------- Public Methods + /** + * Register all cross context sessions inside endAccess. + * Use a list with contains check, that the Portlet API can include a lot of fragments from same or + * different applications with session changes. + * + * @param session cross context session + */ + public void registerReplicationSession(DeltaSession session) { + List sessions = (List)crossContextSessions.get(); + if(sessions != null) { + if(!sessions.contains(session)) { + if(log.isDebugEnabled()) + log.debug(sm.getString("ReplicationValve.crossContext.registerSession", + session.getIdInternal(), + session.getManager().getContainer().getName())); + sessions.add(session); + } + } + } /** * Log the interesting request parameters, invoke the next Valve in the @@ -240,45 +323,61 @@ public void invoke(Request request, Response response) throws IOException, ServletException { - long totalstart = System.currentTimeMillis(); + long totalstart = 0; + //this happens before the request - if (primaryIndicator) + if(isDoProcessingStats()) { + totalstart = System.currentTimeMillis(); + } + if (primaryIndicator) { createPrimaryIndicator(request) ; - getNext().invoke(request, response); - //this happens after the request - long start = System.currentTimeMillis(); - Manager manager = request.getContext().getManager(); - if (manager != null && manager instanceof ClusterManager) { - ClusterManager clusterManager = (ClusterManager) manager; - CatalinaCluster containerCluster = (CatalinaCluster) getContainer() - .getCluster(); - if (containerCluster == null) { - if (log.isWarnEnabled()) - log.warn(sm.getString("ReplicationValve.nocluster")); - return; - } - // valve cluster can access manager - other cluster handle replication - // at host level - hopefully! - if(containerCluster.getManager(clusterManager.getName()) == null) - return ; - if(containerCluster.getMembers().length > 0 ) { - try { - // send invalid sessions - // DeltaManager returns String[0] - if (!(clusterManager instanceof DeltaManager)) - sendInvalidSessions(clusterManager, containerCluster); - // send replication - sendSessionReplicationMessage(request, clusterManager, containerCluster); - } catch (Exception x) { - log.error(sm.getString("ReplicationValve.send.failure"), x); - } finally { - long stop = System.currentTimeMillis(); - updateStats(stop - totalstart, stop - start); + } + Context context = request.getContext(); + boolean isCrossContext = context != null + && context instanceof StandardContext + && ((StandardContext) context).getCrossContext(); + try { + if(isCrossContext) { + if(log.isDebugEnabled()) + log.debug(sm.getString("ReplicationValve.crossContext.add")); + //FIXME add Pool of Arraylists + crossContextSessions.set(new ArrayList()); + } + getNext().invoke(request, response); + Manager manager = request.getContext().getManager(); + if (manager != null && manager instanceof ClusterManager) { + ClusterManager clusterManager = (ClusterManager) manager; + CatalinaCluster containerCluster = (CatalinaCluster) getContainer() + .getCluster(); + if (containerCluster == null) { + if (log.isWarnEnabled()) + log.warn(sm.getString("ReplicationValve.nocluster")); + return; } + // valve cluster can access manager - other cluster handle replication + // at host level - hopefully! + if(containerCluster.getManager(clusterManager.getName()) == null) + return ; + if(containerCluster.hasMembers()) { + sendRepilicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster); + } else { + resetReplicationRequest(request,isCrossContext); + } + } + } finally { + // Array must be remove: Current master request send endAccess at recycle. + // Don't register this request session again! + if(isCrossContext) { + if(log.isDebugEnabled()) + log.debug(sm.getString("ReplicationValve.crossContext.remove")); + // crossContextSessions.remove() only exist at Java 5 + // register ArrayList at a pool + crossContextSessions.set(null); } } } - + + /** * reset the active statitics */ @@ -288,6 +387,8 @@ lastSendTime = 0 ; nrOfFilterRequests = 0 ; nrOfRequests = 0 ; + nrOfSendRequests = 0; + nrOfCrossContextSendRequests = 0; } /** @@ -306,12 +407,99 @@ // --------------------------------------------------------- Protected Methods /** - * Send Cluster Replication Request - * @see DeltaManager#requestCompleted(String) - * @see SimpleTcpCluster#send(ClusterMessage) * @param request - * @param manager - * @param cluster + * @param totalstart + * @param isCrossContext + * @param clusterManager + * @param containerCluster + */ + protected void sendRepilicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) { + //this happens after the request + long start = 0; + if(isDoProcessingStats()) { + start = System.currentTimeMillis(); + } + try { + // send invalid sessions + // DeltaManager returns String[0] + if (!(clusterManager instanceof DeltaManager)) + sendInvalidSessions(clusterManager, containerCluster); + // send replication + sendSessionReplicationMessage(request, clusterManager, containerCluster); + if(isCrossContext) + sendCrossContextSession(containerCluster); + } catch (Exception x) { + // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes! + log.error(sm.getString("ReplicationValve.send.failure"), x); + } finally { + // FIXME this stats update are not cheap!! + if(isDoProcessingStats()) { + updateStats(totalstart,start); + } + } + } + + /** + * Send all changed cross context sessions to backups + * @param containerCluster + */ + protected void sendCrossContextSession(CatalinaCluster containerCluster) { + Object sessions = crossContextSessions.get(); + if(sessions != null && sessions instanceof List + && ((List)sessions).size() >0) { + for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) { + Session session = (Session)iter.next(); + if(log.isDebugEnabled()) + log.debug(sm.getString("ReplicationValve.crossContext.sendDelta", + session.getManager().getContainer().getName() )); + sendMessage(session,(ClusterManager)session.getManager(),containerCluster); + if(isDoProcessingStats()) { + nrOfCrossContextSendRequests++; + } + } + } + } + + /** + * Fix memory leak for long sessions with many changes, when no backup member exists! + * @param request current request after responce is generated + * @param isCrossContext check crosscontext threadlocal + */ + protected void resetReplicationRequest(Request request, boolean isCrossContext) { + Session contextSession = request.getSessionInternal(false); + if(contextSession != null & contextSession instanceof DeltaSession){ + resetDeltaRequest(contextSession); + } + if(isCrossContext) { + Object sessions = crossContextSessions.get(); + if(sessions != null && sessions instanceof List + && ((List)sessions).size() >0) { + Iterator iter = ((List)sessions).iterator(); + for(; iter.hasNext() ;) { + Session session = (Session)iter.next(); + resetDeltaRequest(session); + } + } + } + } + + /** + * Reset DeltaRequest from session + * @param session HttpSession from current request or cross context session + */ + protected void resetDeltaRequest(Session session) { + if(log.isDebugEnabled()) { + log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , + session.getManager().getContainer().getName() )); + } + ((DeltaSession)session).resetDeltaRequest(); + } + + /** + * Send Cluster Replication Request + * @param request current request + * @param manager session manager + * @param cluster replication cluster */ protected void sendSessionReplicationMessage(Request request, ClusterManager manager, CatalinaCluster cluster) { @@ -320,26 +508,50 @@ String uri = request.getDecodedRequestURI(); // request without session change if (!isRequestWithoutSessionChange(uri)) { - if (log.isDebugEnabled()) log.debug(sm.getString("ReplicationValve.invoke.uri", uri)); - String id = session.getIdInternal(); - if (id != null) { - ClusterMessage msg = manager.requestCompleted(id); - // really send replication send request - // FIXME send directly via ClusterManager.send - if (msg != null) { - if(manager.isSendClusterDomainOnly()) - cluster.sendClusterDomain(msg); - else - cluster.send(msg); - } - } + sendMessage(session,manager,cluster); } else - nrOfFilterRequests++; + if(isDoProcessingStats()) + nrOfFilterRequests++; } } + + /** + * Send message delta message from request session + * @param request current request + * @param manager session manager + * @param cluster replication cluster + */ + protected void sendMessage(Session session, + ClusterManager manager, CatalinaCluster cluster) { + String id = session.getIdInternal(); + if (id != null) { + send(manager, cluster, id); + } + } + + /** + * send manager requestCompleted message to cluster + * @param manager SessionManager + * @param cluster replication cluster + * @param sessionId sessionid from the manager + * @see DeltaManager#requestCompleted(String) + * @see SimpleTcpCluster#send(ClusterMessage) + */ + protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) { + ClusterMessage msg = manager.requestCompleted(sessionId); + if (msg != null) { + if(manager.isSendClusterDomainOnly()) { + cluster.sendClusterDomain(msg); + } else { + cluster.send(msg); + } + if(isDoProcessingStats()) + nrOfSendRequests++; + } + } /** * check for session invalidations @@ -351,14 +563,7 @@ if ( invalidIds.length > 0 ) { for ( int i=0;i<invalidIds.length; i++ ) { try { - ClusterMessage imsg = manager.requestCompleted(invalidIds[i]); - // FIXME send directly via ClusterManager.send - if (imsg != null) { - if(manager.isSendClusterDomainOnly()) - cluster.sendClusterDomain(imsg); - else - cluster.send(imsg); - } + send(manager,cluster,invalidIds[i]); } catch ( Exception x ) { log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x); } @@ -387,23 +592,27 @@ * @param requestTime * @param clusterTime */ - protected synchronized void updateStats(long requestTime, long clusterTime) { - totalSendTime+=clusterTime; - totalRequestTime+=requestTime; - nrOfRequests++; - if ( (nrOfRequests % 100) == 0 ) { - if(log.isInfoEnabled()) { + protected void updateStats(long requestTime, long clusterTime) { + synchronized(this) { + lastSendTime=System.currentTimeMillis(); + totalSendTime+=lastSendTime - clusterTime; + totalRequestTime+=lastSendTime - requestTime; + nrOfRequests++; + } + if(log.isInfoEnabled()) { + if ( (nrOfRequests % 100) == 0 ) { log.info(sm.getString("ReplicationValve.stats", new Object[]{ new Long(totalRequestTime/nrOfRequests), new Long(totalSendTime/nrOfRequests), new Long(nrOfRequests), + new Long(nrOfSendRequests), + new Long(nrOfCrossContextSendRequests), new Long(nrOfFilterRequests), new Long(totalRequestTime), new Long(totalSendTime)})); } } - lastSendTime=System.currentTimeMillis(); } @@ -442,7 +651,5 @@ } } } - - } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml?rev=366255&r1=366254&r2=366255&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Thu Jan 5 11:34:04 2006 @@ -988,6 +988,10 @@ domain="Catalina" group="Valve" type="org.apache.catalina.cluster.tcp.ReplicationValve"> + <attribute name="info" + description="Class version info" + type="java.lang.String" + writeable="false"/> <attribute name="filter" description="resource filter to disable session replication check" type="java.lang.String"/> @@ -998,12 +1002,24 @@ <attribute name="primaryIndicatorName" description="Request attribute name to indicate that request processing is at primary session node" type="java.lang.String"/> + <attribute name="doProcessingStats" + is="true" + description="active statistics counting" + type="boolean"/> <attribute name="nrOfRequests" description="number of replicated requests" type="long" writeable="false"/> <attribute name="nrOfFilterRequests" description="number of filtered requests" + type="long" + writeable="false"/> + <attribute name="nrOfSendRequests" + description="number of send requests" + type="long" + writeable="false"/> + <attribute name="nrOfCrossContextSendRequests" + description="number of send cross context session requests" type="long" writeable="false"/> <attribute name="totalRequestTime" --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]