Author: fhanik Date: Mon Mar 20 10:07:29 2006 New Revision: 387267 URL: http://svn.apache.org/viewcvs?rev=387267&view=rev Log: Cleaned up formatting on the deltamanager, no code changes. Need a base to create a backup manager using a replicated map, and hence, no need to do any message transfers
Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java?rev=387267&r1=387266&r2=387267&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaManager.java Mon Mar 20 10:07:29 2006 @@ -17,10 +17,7 @@ package org.apache.catalina.ha.session; import java.beans.PropertyChangeEvent; -import java.beans.PropertyChangeListener; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; @@ -34,22 +31,18 @@ import org.apache.catalina.Context; import org.apache.catalina.Engine; import org.apache.catalina.Host; -import org.apache.catalina.Valve; -import org.apache.catalina.Lifecycle; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleListener; -import org.apache.catalina.Loader; import org.apache.catalina.Session; +import org.apache.catalina.Valve; +import org.apache.catalina.core.StandardContext; import org.apache.catalina.ha.CatalinaCluster; -import org.apache.catalina.ha.ClusterManager; import org.apache.catalina.ha.ClusterMessage; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.session.ManagerBase; import org.apache.catalina.ha.tcp.ReplicationValve; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.io.ReplicationStream; import org.apache.catalina.util.LifecycleSupport; import org.apache.catalina.util.StringManager; -import org.apache.catalina.core.StandardContext; -import org.apache.catalina.tribes.io.ReplicationStream; /** * The DeltaManager manages replicated sessions by only replicating the deltas @@ -73,15 +66,12 @@ public class DeltaManager extends ClusterManagerBase{ // ---------------------------------------------------- Security Classes - - public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory - .getLog(DeltaManager.class); + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(DeltaManager.class); /** * The string manager for this package. */ - protected static StringManager sm = StringManager - .getManager(Constants.Package); + protected static StringManager sm = StringManager.getManager(Constants.Package); // ----------------------------------------------------- Instance Variables @@ -99,11 +89,8 @@ * The descriptive name of this Manager implementation (for logging). */ protected static String managerName = "DeltaManager"; - protected String name = null; - protected boolean defaultMode = false; - private CatalinaCluster cluster = null; /** @@ -120,77 +107,47 @@ * The maximum number of active Sessions allowed, or -1 for no limit. */ private int maxActiveSessions = -1; - private boolean expireSessionsOnShutdown = false; - private boolean notifyListenersOnReplication = true; - private boolean notifySessionListenersOnReplication = true; - private boolean stateTransfered = false ; - private int stateTransferTimeout = 60; - private boolean sendAllSessions = true; - private boolean sendClusterDomainOnly = true ; - private int sendAllSessionsSize = 1000 ; /** * wait time between send session block (default 2 sec) */ private int sendAllSessionsWaitTime = 2 * 1000 ; - private ArrayList receivedMessageQueue = new ArrayList() ; - private boolean receiverQueue = false ; - private boolean stateTimestampDrop = true ; - private long stateTransferCreateSendTime; // ------------------------------------------------------------------ stats attributes int rejectedSessions = 0; - private long sessionReplaceCounter = 0 ; - long processingTime = 0; - private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ; - private long counterSend_EVT_ALL_SESSION_DATA = 0 ; - private long counterReceive_EVT_ALL_SESSION_DATA = 0 ; - private long counterReceive_EVT_SESSION_CREATED = 0 ; - private long counterReceive_EVT_SESSION_EXPIRED = 0; - private long counterReceive_EVT_SESSION_ACCESSED = 0 ; - private long counterReceive_EVT_SESSION_DELTA = 0; - private long counterSend_EVT_GET_ALL_SESSIONS = 0 ; - private long counterSend_EVT_SESSION_CREATED = 0; - private long counterSend_EVT_SESSION_DELTA = 0 ; - private long counterSend_EVT_SESSION_ACCESSED = 0; - private long counterSend_EVT_SESSION_EXPIRED = 0; - private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ; - private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ; - private int counterNoStateTransfered = 0 ; // ------------------------------------------------------------- Constructor - public DeltaManager() { super(); } @@ -203,9 +160,7 @@ * <code><description>/<version></code>. */ public String getInfo() { - - return (info); - + return info; } public void setName(String name) { @@ -216,9 +171,7 @@ * Return the descriptive short name of this Manager implementation. */ public String getName() { - - return (name); - + return name; } /** @@ -433,9 +386,7 @@ * Return the maximum number of active Sessions allowed, or -1 for no limit. */ public int getMaxActiveSessions() { - return (this.maxActiveSessions); - } /** @@ -445,12 +396,9 @@ * The new maximum number of sessions */ public void setMaxActiveSessions(int max) { - int oldMaxActiveSessions = this.maxActiveSessions; this.maxActiveSessions = max; - support.firePropertyChange("maxActiveSessions", new Integer( - oldMaxActiveSessions), new Integer(this.maxActiveSessions)); - + support.firePropertyChange("maxActiveSessions", new Integer(oldMaxActiveSessions), new Integer(this.maxActiveSessions)); } /** @@ -492,8 +440,7 @@ /** * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set. */ - public void setNotifySessionListenersOnReplication( - boolean notifyListenersCreateSessionOnReplication) { + public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) { this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication; } @@ -510,8 +457,7 @@ return notifyListenersOnReplication; } - public void setNotifyListenersOnReplication( - boolean notifyListenersOnReplication) { + public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) { this.notifyListenersOnReplication = notifyListenersOnReplication; } @@ -546,7 +492,6 @@ * The associated Container */ public void setContainer(Container container) { - // De-register from the old Container (if any) if ((this.container != null) && (this.container instanceof Context)) ((Context) this.container).removePropertyChangeListener(this); @@ -556,8 +501,7 @@ // Register with the new Container (if any) if ((this.container != null) && (this.container instanceof Context)) { - setMaxInactiveInterval(((Context) this.container) - .getSessionTimeout() * 60); + setMaxInactiveInterval(((Context) this.container).getSessionTimeout() * 60); ((Context) this.container).addPropertyChangeListener(this); } @@ -596,19 +540,16 @@ * @return The session */ public Session createSession(String sessionId, boolean distribute) { - if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) { rejectedSessions++; throw new IllegalStateException(sm.getString("deltaManager.createSession.ise")); } - DeltaSession session = (DeltaSession) super.createSession(sessionId) ; if (distribute) { sendCreateSession(session.getId(), session); } if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size()))); - return (session); } @@ -626,8 +567,7 @@ null, sessionId, sessionId + "-" + System.currentTimeMillis()); - if (log.isDebugEnabled()) - log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId)); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId)); counterSend_EVT_SESSION_CREATED++; send(msg); } @@ -671,8 +611,7 @@ * @throws ClassNotFoundException * @throws IOException */ - protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) - throws ClassNotFoundException, IOException { + protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException { ReplicationStream ois = getReplicationStream(data); session.getDeltaRequest().readExternal(ois); ois.close(); @@ -687,8 +626,7 @@ * @return serialized delta request * @throws IOException */ - protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest) - throws IOException { + protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); deltaRequest.writeExternal(oos); @@ -706,8 +644,7 @@ * @exception IOException * if an input/output error occurs */ - protected void deserializeSessions(byte[] data) throws ClassNotFoundException, - IOException { + protected void deserializeSessions(byte[] data) throws ClassNotFoundException,IOException { // Initialize our internal data structures //sessions.clear(); //should not do this @@ -741,10 +678,7 @@ } else { sessionReplaceCounter++; // FIXME better is to grap this sessions again ! - if (log.isWarnEnabled()) - log.warn(sm.getString( - "deltaManager.loading.existing.session", - session.getIdInternal())); + if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session",session.getIdInternal())); } add(session); } @@ -757,14 +691,12 @@ } finally { // Close the input stream try { - if (ois != null) - ois.close(); + if (ois != null) ois.close(); } catch (IOException f) { // ignored } ois = null; - if (originalLoader != null) - Thread.currentThread().setContextClassLoader(originalLoader); + if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader); } } @@ -820,9 +752,7 @@ * The listener to add */ public void addLifecycleListener(LifecycleListener listener) { - lifecycle.addLifecycleListener(listener); - } /** @@ -830,9 +760,7 @@ * Lifecycle has no listeners registered, a zero-length array is returned. */ public LifecycleListener[] findLifecycleListeners() { - return lifecycle.findLifecycleListeners(); - } /** @@ -842,9 +770,7 @@ * The listener to remove */ public void removeLifecycleListener(LifecycleListener listener) { - lifecycle.removeLifecycleListener(listener); - } /** @@ -857,8 +783,7 @@ * component from being used */ public void start() throws LifecycleException { - if (!initialized) - init(); + if (!initialized) init(); // Validate and update our current component state if (started) { @@ -875,6 +800,7 @@ //the channel is already running Cluster cluster = getCluster() ; // stop remove cluster binding + //wow, how many nested levels of if statements can we have ;) if(cluster == null) { Container context = getContainer() ; if(context != null && context instanceof Context) { @@ -888,7 +814,7 @@ if(engine != null && engine instanceof Engine) { cluster = engine.getCluster(); if(cluster != null && cluster instanceof CatalinaCluster) { - setCluster((CatalinaCluster) cluster) ; + setCluster((CatalinaCluster) cluster) ; } } else { cluster = null ; @@ -908,13 +834,10 @@ } else if( cluster.getContainer() instanceof Engine){ type = "Engine" ; } - log.info(sm - .getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName())); + log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName())); } } - if (log.isInfoEnabled()) - log.info(sm - .getString("deltaManager.startClustering", getName())); + if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName())); //to survice context reloads, as only a stop/start is called, not // createManager ((CatalinaCluster)cluster).addManager(getName(), this); @@ -937,9 +860,7 @@ if(mbr == null) { // No domain member found return; } - SessionMessage msg = new SessionMessageImpl(this.getName(), - SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", - "GET-ALL-" + getName()); + SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName()); // set reference time stateTransferCreateSendTime = beforeSendTime ; // request session state @@ -951,37 +872,22 @@ receiverQueue = true ; } cluster.send(msg, mbr); - if (log.isWarnEnabled()) - log.warn(sm.getString("deltaManager.waitForSessionState", - getName(), mbr)); + if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr)); // FIXME At sender ack mode this method check only the state transfer and resend is a problem! waitForSendAllSessions(beforeSendTime); } finally { synchronized(receivedMessageQueue) { - for (Iterator iter = receivedMessageQueue.iterator(); iter - .hasNext();) { + for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) { SessionMessage smsg = (SessionMessage) iter.next(); if (!stateTimestampDrop) { - messageReceived(smsg, - smsg.getAddress() != null ? (Member) smsg - .getAddress() : null); + messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null); } else { - if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS - && smsg.getTimestamp() >= stateTransferCreateSendTime) { + if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) { // FIXME handle EVT_GET_ALL_SESSIONS later - messageReceived( - smsg, - smsg.getAddress() != null ? (Member) smsg - .getAddress() - : null); + messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null); } else { if (log.isWarnEnabled()) { - log.warn(sm.getString( - "deltaManager.dropMessage", - getName(), smsg - .getEventTypeString(), - new Date(stateTransferCreateSendTime), new Date( - smsg.getTimestamp()))); + log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp()))); } } } @@ -991,8 +897,7 @@ } } } else { - if (log.isInfoEnabled()) - log.info(sm.getString("deltaManager.noMembers", getName())); + if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName())); } } @@ -1002,24 +907,22 @@ */ protected void registerSessionAtReplicationValve(DeltaSession session) { if(replicationValve == null) { - if(container instanceof StandardContext - && ((StandardContext)container).getCrossContext()) { + if(container instanceof StandardContext && ((StandardContext)container).getCrossContext()) { Cluster cluster = getCluster() ; if(cluster != null && cluster instanceof CatalinaCluster) { Valve[] valves = ((CatalinaCluster)cluster).getValves(); if(valves != null && valves.length > 0) { for(int i=0; replicationValve == null && i < valves.length ; i++ ){ - if(valves[i] instanceof ReplicationValve) - replicationValve = (ReplicationValve)valves[i] ; - } + if(valves[i] instanceof ReplicationValve) replicationValve = (ReplicationValve)valves[i] ; + }//for if(replicationValve == null && log.isDebugEnabled()) { log.debug("no ReplicationValve found for CrossContext Support"); - } - } - } - } - } + }//endif + }//end if + }//endif + }//end if + }//end if if(replicationValve != null) { replicationValve.registerReplicationSession(session); } @@ -1036,19 +939,13 @@ if(isSendClusterDomainOnly()) { for (int i = 0; mbr == null && i < mbrs.length; i++) { Member member = mbrs[i]; - if(localMemberDomain.equals(member.getDomain())) - mbr = member ; + if(localMemberDomain.equals(member.getDomain())) mbr = member ; } } else { - if(mbrs.length != 0 ) - mbr = mbrs[0]; + if(mbrs.length != 0 ) mbr = mbrs[0]; } - if(mbr == null && log.isWarnEnabled()) - log.warn(sm.getString("deltaManager.noMasterMember", - getName(), localMemberDomain)); - if(mbr != null && log.isDebugEnabled()) - log.warn(sm.getString("deltaManager.foundMasterMember", - getName(), mbr)); + if(mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember",getName(), localMemberDomain)); + if(mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember",getName(), mbr)); return mbr; } @@ -1066,6 +963,7 @@ try { Thread.sleep(100); } catch (Exception sleep) { + // } reqNow = System.currentTimeMillis(); isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout())); @@ -1084,12 +982,10 @@ } if (isTimeout || (!getStateTransfered())) { counterNoStateTransfered++ ; - log.error(sm.getString("deltaManager.noSessionState", - getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime))); + log.error(sm.getString("deltaManager.noSessionState",getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime))); } else { if (log.isInfoEnabled()) - log.info(sm.getString("deltaManager.sessionReceived", - getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime))); + log.info(sm.getString("deltaManager.sessionReceived",getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime))); } } @@ -1110,14 +1006,12 @@ // Validate and update our current component state if (!started) - throw new LifecycleException(sm - .getString("deltaManager.notStarted")); + throw new LifecycleException(sm.getString("deltaManager.notStarted")); lifecycle.fireLifecycleEvent(STOP_EVENT, null); started = false; // Expire all active sessions - if (log.isInfoEnabled()) - log.info(sm.getString("deltaManager.expireSessions", getName())); + if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName())); Session sessions[] = findSessions(); for (int i = 0; i < sessions.length; i++) { DeltaSession session = (DeltaSession) sessions[i]; @@ -1155,11 +1049,9 @@ // Process a relevant property change if (event.getPropertyName().equals("sessionTimeout")) { try { - setMaxInactiveInterval(((Integer) event.getNewValue()) - .intValue() * 60); + setMaxInactiveInterval(((Integer) event.getNewValue()).intValue() * 60); } catch (NumberFormatException e) { - log.error(sm.getString("deltaManager.sessionTimeout", event - .getNewValue())); + log.error(sm.getString("deltaManager.sessionTimeout", event.getNewValue())); } } @@ -1179,27 +1071,26 @@ if (cmsg != null && cmsg instanceof SessionMessage) { SessionMessage msg = (SessionMessage) cmsg; switch (msg.getEventType()) { - case SessionMessage.EVT_GET_ALL_SESSIONS: - case SessionMessage.EVT_SESSION_CREATED: - case SessionMessage.EVT_SESSION_EXPIRED: - case SessionMessage.EVT_SESSION_ACCESSED: - case SessionMessage.EVT_SESSION_DELTA: { - synchronized(receivedMessageQueue) { - if(receiverQueue) { - receivedMessageQueue.add(msg); - return ; + case SessionMessage.EVT_GET_ALL_SESSIONS: + case SessionMessage.EVT_SESSION_CREATED: + case SessionMessage.EVT_SESSION_EXPIRED: + case SessionMessage.EVT_SESSION_ACCESSED: + case SessionMessage.EVT_SESSION_DELTA: { + synchronized(receivedMessageQueue) { + if(receiverQueue) { + receivedMessageQueue.add(msg); + return ; + } } + break; + } + default: { + //we didn't queue, do nothing + break; } - break; - } - default: { - //we didn't queue, do nothing - break; - } } //switch - messageReceived(msg, msg.getAddress() != null ? (Member) msg - .getAddress() : null); + messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null); } } @@ -1226,8 +1117,10 @@ counterSend_EVT_SESSION_DELTA++; byte[] data = unloadDeltaRequest(deltaRequest); msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_DELTA, data, sessionId, - sessionId + "-" + System.currentTimeMillis()); + SessionMessage.EVT_SESSION_DELTA, + data, + sessionId, + sessionId + "-" + System.currentTimeMillis()); session.resetDeltaRequest(); } } @@ -1235,48 +1128,42 @@ if(!session.isPrimarySession()) { counterSend_EVT_SESSION_ACCESSED++; msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, - sessionId + "-" + System.currentTimeMillis()); + SessionMessage.EVT_SESSION_ACCESSED, + null, + sessionId, + sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { - log.debug(sm.getString( - "deltaManager.createMessage.accessChangePrimary", - getName(), sessionId)); + log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId)); } } } else { // log only outside synch block! if (log.isDebugEnabled()) { - log.debug(sm.getString( - "deltaManager.createMessage.delta", - getName(), sessionId)); + log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId)); } } session.setPrimarySession(true); //check to see if we need to send out an access message if ((msg == null)) { - long replDelta = System.currentTimeMillis() - - session.getLastTimeReplicated(); + long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated(); if (replDelta > (getMaxInactiveInterval() * 1000)) { counterSend_EVT_SESSION_ACCESSED++; msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_ACCESSED, null, - sessionId, sessionId + "-" + System.currentTimeMillis()); + SessionMessage.EVT_SESSION_ACCESSED, + null, + sessionId, + sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { - log.debug(sm.getString( - "deltaManager.createMessage.access", getName(), - sessionId)); + log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId)); } } } //update last replicated time - if (msg != null) - session.setLastTimeReplicated(System.currentTimeMillis()); + if (msg != null) session.setLastTimeReplicated(System.currentTimeMillis()); return msg; } catch (IOException x) { - log.error(sm.getString( - "deltaManager.createMessage.unableCreateDeltaRequest", - sessionId), x); + log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x); return null; } @@ -1329,12 +1216,8 @@ */ protected void sessionExpired(String id) { counterSend_EVT_SESSION_EXPIRED++ ; - SessionMessage msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_EXPIRED, null, id, id - + "-EXPIRED-MSG"); - if (log.isDebugEnabled()) - log.debug(sm.getString("deltaManager.createMessage.expire", - getName(), id)); + SessionMessage msg = new SessionMessageImpl(getName(),SessionMessage.EVT_SESSION_EXPIRED, null, id, id+ "-EXPIRED-MSG"); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.expire",getName(), id)); send(msg); } @@ -1348,8 +1231,7 @@ int expireDirect = 0 ; int expireIndirect = 0 ; - if(log.isDebugEnabled()) - log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length); + if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length); for (int i = 0; i < sessions.length; i++) { if (sessions[i] instanceof DeltaSession) { DeltaSession session = (DeltaSession) sessions[i]; @@ -1359,13 +1241,12 @@ expireDirect++; } else { expireIndirect++; - } - } - } - } + }//end if + }//end if + }//end if + }//for long timeEnd = System.currentTimeMillis(); - if(log.isDebugEnabled()) - log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect); + if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect); } @@ -1390,12 +1271,11 @@ boolean sameDomain= localMemberDomain.equals(sender.getDomain()); if (!sameDomain && log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain", - new Object[] {getName(), - msg.getEventTypeString(), - sender, - sender.getDomain(), - localMemberDomain } - )); + new Object[] {getName(), + msg.getEventTypeString(), + sender, + sender.getDomain(), + localMemberDomain })); } return sameDomain ; } @@ -1416,47 +1296,44 @@ return; } try { - if (log.isDebugEnabled()) - log.debug(sm.getString("deltaManager.receiveMessage.eventType", - getName(), msg.getEventTypeString(), sender)); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender)); switch (msg.getEventType()) { - case SessionMessage.EVT_GET_ALL_SESSIONS: { - handleGET_ALL_SESSIONS(msg,sender); - break; - } - case SessionMessage.EVT_ALL_SESSION_DATA: { - handleALL_SESSION_DATA(msg,sender); - break; - } - case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: { - handleALL_SESSION_TRANSFERCOMPLETE(msg,sender); - break; - } - case SessionMessage.EVT_SESSION_CREATED: { - handleSESSION_CREATED(msg,sender); - break; - } - case SessionMessage.EVT_SESSION_EXPIRED: { - handleSESSION_EXPIRED(msg,sender); - break; - } - case SessionMessage.EVT_SESSION_ACCESSED: { - handleSESSION_ACCESSED(msg,sender); - break; - } - case SessionMessage.EVT_SESSION_DELTA: { - handleSESSION_DELTA(msg,sender); - break; - } - default: { - //we didn't recognize the message type, do nothing - break; - } + case SessionMessage.EVT_GET_ALL_SESSIONS: { + handleGET_ALL_SESSIONS(msg,sender); + break; + } + case SessionMessage.EVT_ALL_SESSION_DATA: { + handleALL_SESSION_DATA(msg,sender); + break; + } + case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: { + handleALL_SESSION_TRANSFERCOMPLETE(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_CREATED: { + handleSESSION_CREATED(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_EXPIRED: { + handleSESSION_EXPIRED(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_ACCESSED: { + handleSESSION_ACCESSED(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_DELTA: { + handleSESSION_DELTA(msg,sender); + break; + } + default: { + //we didn't recognize the message type, do nothing + break; + } } //switch } catch (Exception x) { - log.error(sm.getString("deltaManager.receiveMessage.error", - getName()), x); + log.error(sm.getString("deltaManager.receiveMessage.error",getName()), x); } } @@ -1470,10 +1347,7 @@ */ protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) { counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ; - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.transfercomplete", - getName(), sender.getHost(), new Integer(sender.getPort()))); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete",getName(), sender.getHost(), new Integer(sender.getPort()))); stateTransferCreateSendTime = msg.getTimestamp() ; stateTransfered = true ; } @@ -1485,15 +1359,12 @@ * @throws IOException * @throws ClassNotFoundException */ - protected void handleSESSION_DELTA(SessionMessage msg, Member sender) - throws IOException, ClassNotFoundException { + protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException { counterReceive_EVT_SESSION_DELTA++; byte[] delta = msg.getSession(); DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { - if (log.isDebugEnabled()) - log.debug(sm.getString("deltaManager.receiveMessage.delta", - getName(), msg.getSessionID())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID())); DeltaRequest dreq = loadDeltaRequest(session, delta); dreq.execute(session, notifyListenersOnReplication); session.setPrimarySession(false); @@ -1508,13 +1379,9 @@ */ protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException { counterReceive_EVT_SESSION_ACCESSED++; - DeltaSession session = (DeltaSession) findSession(msg - .getSessionID()); + DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.accessed", - getName(), msg.getSessionID())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed",getName(), msg.getSessionID())); session.access(); session.setPrimarySession(false); session.endAccess(); @@ -1529,13 +1396,9 @@ */ protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException { counterReceive_EVT_SESSION_EXPIRED++; - DeltaSession session = (DeltaSession) findSession(msg - .getSessionID()); + DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.expired", - getName(), msg.getSessionID())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired",getName(), msg.getSessionID())); session.expire(notifySessionListenersOnReplication, false); } } @@ -1547,10 +1410,7 @@ */ protected void handleSESSION_CREATED(SessionMessage msg,Member sender) { counterReceive_EVT_SESSION_CREATED++; - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.createNewSession", - getName(), msg.getSessionID())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID())); DeltaSession session = (DeltaSession) createEmptySession(); session.setManager(this); session.setValid(true); @@ -1574,16 +1434,10 @@ */ protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException { counterReceive_EVT_ALL_SESSION_DATA++; - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.allSessionDataBegin", - getName())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName())); byte[] data = msg.getSession(); deserializeSessions(data); - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.allSessionDataAfter", - getName())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName())); //stateTransferred = true; } @@ -1596,13 +1450,10 @@ * @param sender * @throws IOException */ - protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) - throws IOException { + protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException { counterReceive_EVT_GET_ALL_SESSIONS++; //get a list of all the session from this manager - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.unloadingBegin", getName())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName())); // Write the number of active sessions, followed by the details // get all sessions and serialize without sync Session[] currentSessions = findSessions(); @@ -1611,13 +1462,10 @@ sendSessions(sender, currentSessions, findSessionTimestamp); } else { // send session at blocks - int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length - : getSendAllSessionsSize(); + int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize(); Session[] sendSessions = new Session[len]; for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) { - len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - - i - : getSendAllSessionsSize(); + len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize(); System.arraycopy(currentSessions, i, sendSessions, 0, len); sendSessions(sender, sendSessions,findSessionTimestamp); if (getSendAllSessionsWaitTime() > 0) { @@ -1625,19 +1473,13 @@ Thread.sleep(getSendAllSessionsWaitTime()); } catch (Exception sleep) { } - } - } - } + }//end if + }//for + }//end if - SessionMessage newmsg = new SessionMessageImpl(name, - SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null, - "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED" - + getName()); + SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName()); newmsg.setTimestamp(findSessionTimestamp); - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.createMessage.allSessionTransfered", - getName())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName())); counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++; cluster.send(newmsg, sender); } @@ -1650,24 +1492,12 @@ * @param sendTimestamp * @throws IOException */ - protected void sendSessions(Member sender, Session[] currentSessions, - long sendTimestamp) throws IOException { + protected void sendSessions(Member sender, Session[] currentSessions,long sendTimestamp) throws IOException { byte[] data = serializeSessions(currentSessions); - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.receiveMessage.unloadingAfter", - getName())); - SessionMessage newmsg = new SessionMessageImpl(name, - SessionMessage.EVT_ALL_SESSION_DATA, data, - "SESSION-STATE", "SESSION-STATE-" + getName()); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter",getName())); + SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_DATA, data,"SESSION-STATE", "SESSION-STATE-" + getName()); newmsg.setTimestamp(sendTimestamp); - //if(isSendSESSIONSTATEcompressed()) { - // newmsg.setCompress(ClusterMessage.RESEND_ALLOWED); - //} - if (log.isDebugEnabled()) - log.debug(sm.getString( - "deltaManager.createMessage.allSessionData", - getName())); + if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData",getName())); counterSend_EVT_ALL_SESSION_DATA++; cluster.send(newmsg, sender); } Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=387267&r1=387266&r2=387267&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Mon Mar 20 10:07:29 2006 @@ -48,7 +48,7 @@ import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.ha.session.DeltaManager; -import org.apache.catalina.tribes.util.IDynamicProperty; +import org.apache.catalina.ha.util.IDynamicProperty; import org.apache.catalina.util.LifecycleSupport; import org.apache.catalina.util.StringManager; import org.apache.commons.logging.Log; @@ -353,7 +353,7 @@ * @return Member */ public Member getLocalMember() { - return channel.getLocalMember(); + return channel.getLocalMember(true); } // ------------------------------------------------------------- dynamic @@ -779,11 +779,11 @@ msg.setAddress(getLocalMember()); if (dest != null) { if (!getLocalMember().equals(dest)) { - channel.send(new Member[] {dest}, msg); + channel.send(new Member[] {dest}, msg,0); } else log.error("Unable to send message to local member " + msg); } else { - channel.send(channel.getMembers(),msg); + channel.send(channel.getMembers(),msg,0); } } catch (Exception x) { log.error("Unable to send message through cluster sender.", x); Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java?rev=387267&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/util/IDynamicProperty.java Mon Mar 20 10:07:29 2006 @@ -0,0 +1,57 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.ha.util; + +import java.util.Iterator; + +/** + * @author Peter Rossbach + * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ + */ + +public interface IDynamicProperty { + + /** + * set config attributes with reflect + * + * @param name + * @param value + */ + public void setProperty(String name, Object value) ; + + /** + * get current config + * + * @param key + * @return The property + */ + public Object getProperty(String key) ; + /** + * Get all properties keys + * + * @return An iterator over the property names + */ + public Iterator getPropertyNames() ; + + /** + * remove a configured property. + * + * @param key + */ + public void removeProperty(String key) ; + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]