Author: fhanik Date: Tue Mar 21 17:57:39 2006 New Revision: 387686 URL: http://svn.apache.org/viewcvs?rev=387686&view=rev Log: Initial version of the backup manager, still super buggy, need to figure out a way on how to deserialize sessions.
Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Tue Mar 21 17:57:39 2006 @@ -216,11 +216,12 @@ setConnected(false); if ( socketChannel != null ) { try { - Socket socket = socketChannel.socket(); + Socket socket = null; + //socket = socketChannel.socket(); //error free close, all the way - try {socket.shutdownOutput();}catch ( Exception x){} - try {socket.shutdownInput();}catch ( Exception x){} - try {socket.close();}catch ( Exception x){} + //try {socket.shutdownOutput();}catch ( Exception x){} + //try {socket.shutdownInput();}catch ( Exception x){} + //try {socket.close();}catch ( Exception x){} try {socketChannel.close();}catch ( Exception x){} socket = null; }finally { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Mar 21 17:57:39 2006 @@ -64,6 +64,11 @@ * The load factor used when none specified in constructor. **/ public static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * Used to identify the map + */ + final String chset = "ISO-8859-1"; //------------------------------------------------------------------------------ // INSTANCE VARIABLES @@ -76,7 +81,7 @@ private transient Object stateMutex = new Object(); private transient ArrayList mapMembers = new ArrayList(); private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; - private transient Object mapOwner; + private transient MapOwner mapOwner; //------------------------------------------------------------------------------ // CONSTRUCTORS @@ -90,7 +95,7 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public AbstractReplicatedMap(Object owner, + public AbstractReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, @@ -106,9 +111,9 @@ return new Member[] {m}; } - private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) { + private void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) { this.mapOwner = owner; - final String chset = "ISO-8859-1"; + this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; @@ -132,6 +137,7 @@ false, null, null, null, wrap(channel.getLocalMember(false))); Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, channelSendOptions, timeout); for (int i = 0; i < resp.length; i++) { + mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } catch (ChannelException x) { @@ -140,6 +146,7 @@ //transfer state from another map transferState(); + printMap(); } public void breakdown() { @@ -305,6 +312,7 @@ //backup request if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) { + System.out.println("Received a retrieve request for id:"+mapmsg.getKey()); MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); if (entry == null)return null; mapmsg.setValue( (Serializable) entry.getValue()); @@ -377,6 +385,7 @@ } if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) { + System.out.println("Received a backup request for id:"+mapmsg.getKey()); MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); if (entry == null) { entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); @@ -386,7 +395,6 @@ if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); } - super.put(entry.getKey(), entry); } else { entry.setBackup(true); entry.setProxy(false); @@ -414,8 +422,9 @@ entry.setValue(mapmsg.getValue()); } //end if } //end if + super.put(entry.getKey(), entry); } //end if - + printMap(); } public boolean accept(Serializable msg, Member sender) { @@ -426,10 +435,15 @@ } public void mapMemberAdded(Member member) { + System.out.println("Received Member added:"+member.getName()); + if ( member.equals(getChannel().getLocalMember(false)) ) return; + System.out.println("Received Member added2:"+member.getName()); //select a backup node if we don't have one synchronized (mapMembers) { if (!mapMembers.contains(member) ) mapMembers.add(member); } + System.out.println("Received Member added3:"+member.getName()); + printMap(); synchronized (stateMutex) { Iterator i = super.entrySet().iterator(); while (i.hasNext()) { @@ -460,6 +474,8 @@ } public void memberDisappeared(Member member) { + Exception ex = new Exception("[DEBUG] Removing member:"+member.getName()); + ex.printStackTrace(); synchronized (mapMembers) { mapMembers.remove(member); } @@ -498,13 +514,41 @@ // METHODS TO OVERRIDE //------------------------------------------------------------------------------ + protected void printMap() { + try { + System.out.println("\nMap["+((Object)this).toString()+"; " + new String(mapContextName, chset) + ", Map Size:" + super.size()); + Member[] mbrs = getMapMembers(); + for ( int i=0; i<mbrs.length;i++ ) { + System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName()); + } + Iterator i = super.entrySet().iterator(); + int cnt = 0; + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + System.out.println( (++cnt) + ". " + e.getValue()); + } + System.out.println("EndMap]\n\n"); + }catch ( Exception ignore) { + ignore.printStackTrace(); + } + } + +//------------------------------------------------------------------------------ +// Map Owner - serialization/deserialization +//------------------------------------------------------------------------------ + public static interface MapOwner { + + public byte[] serialize(Object mapObject) throws IOException; + + public Serializable deserialize(byte[] data) throws ClassNotFoundException,IOException; + + } //------------------------------------------------------------------------------ // Map Entry class //------------------------------------------------------------------------------ - public static class MapEntry - implements Map.Entry { + public static class MapEntry implements Map.Entry { private boolean backup; private boolean proxy; private Member[] backupNodes; @@ -564,29 +608,12 @@ return key; } - public byte[] getDiff() throws IOException { - if (isDiffable()) { - return ( (ReplicatedMapEntry) value).getDiff(); - } else { - return getData(); - } - } - public int hashCode() { - return value.hashCode(); + return key.hashCode(); } public boolean equals(Object o) { - return value.equals(o); - } - - /** - * returns the entire object as a byte array - * @return byte[] - * @throws IOException - */ - public byte[] getData() throws IOException { - return (new ObjectStreamable(value)).getBuf().getArray(); + return key.equals(o); } /** @@ -614,6 +641,16 @@ value = XByteBuffer.deserialize(data, offset, length); } } + + public String toString() { + StringBuffer buf = new StringBuffer("MapEntry[key:"); + buf.append(getKey()).append("; "); + buf.append("value:").append(getValue()).append("; "); + buf.append("primary:").append(isPrimary()).append("; "); + buf.append("backup:").append(isBackup()).append("; "); + buf.append("proxy:").append(isProxy()).append(";]"); + return buf.toString(); + } } @@ -640,8 +677,7 @@ public MapMessage() {} - public MapMessage(byte[] mapId, - int msgtype, boolean diff, + public MapMessage(byte[] mapId,int msgtype, boolean diff, Serializable key, Serializable value, byte[] diffvalue, Member[] nodes) { this.mapId = mapId; @@ -751,9 +787,8 @@ out.write(d); } } - } - + public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(mapId.length); out.write(mapId); @@ -803,60 +838,6 @@ } } //MapMessage -//------------------------------------------------------------------------------ -// streamable class -//------------------------------------------------------------------------------ - - public static class ObjectStreamable - implements Streamable { - private DirectByteArrayOutputStream buf; - private int pos = 0; - public ObjectStreamable(Serializable value) throws IOException { - buf = new DirectByteArrayOutputStream(1024); - ObjectOutputStream out = new ObjectOutputStream(buf); - out.writeObject(value); - out.flush(); - } - - /** - * returns true if the stream has reached its end - * @return boolean - */ - public synchronized boolean eof() { - return (pos >= buf.size()); - - } - - /** - * write data into the byte array starting at offset, maximum bytes read are (data.length-offset) - * @param data byte[] - the array to read data into - * @param offset int - start position for writing data - * @return int - the number of bytes written into the data buffer - */ - public synchronized int write(byte[] data, int offset) throws IOException { - int length = Math.min(data.length - offset, buf.size() - pos); - System.arraycopy(buf.getArrayDirect(), pos, data, offset, length); - pos = pos + length; - return length; - } - - public synchronized int read(byte[] data, int offset, int length) throws IOException { - return -1; - } - - public DirectByteArrayOutputStream getBuf() { - return buf; - } - - public int size() { - return buf.size(); - } - - public int pos() { - return pos; - } - - } public Channel getChannel() { return channel; @@ -886,7 +867,7 @@ return mapOwner; } - public void setMapOwner(Object mapOwner) { + public void setMapOwner(MapOwner mapOwner) { this.mapOwner = mapOwner; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Tue Mar 21 17:57:39 2006 @@ -29,6 +29,7 @@ import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner; /** * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. @@ -84,7 +85,7 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) { + public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) { super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT); } @@ -95,7 +96,7 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ - public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { + public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT); } @@ -105,7 +106,7 @@ * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ - public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) { + public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName) { super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT); } @@ -142,29 +143,39 @@ } public Object get(Object key) { + System.out.println("Getting session id:"+key); + printMap(); MapEntry entry = (MapEntry)super.get(key); if ( entry == null ) return null; if ( !entry.isPrimary() ) { //if the message is not primary, we need to retrieve the latest value try { - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, - (Serializable) key, null, null, null); - Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout()); - if (resp == null || resp.length == 0) { - //no responses - log.warn("Unable to retrieve remote object for key:" + key); - return null; - } - msg = (MapMessage) resp[0].getMessage(); - Member[] backup = entry.getBackupNodes(); - if ( entry.getValue() instanceof ReplicatedMapEntry ) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - val.setOwner(getMapOwner()); + Member[] backup = null; + MapMessage msg = null; + if ( !entry.isBackup() ) { + //make sure we don't retrieve from ourselves + System.out.println("Retrieving from remote session id:"+key); + msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, + (Serializable) key, null, null, null); + Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout()); + if (resp == null || resp.length == 0) { + //no responses + log.warn("Unable to retrieve remote object for key:" + key); + return null; + } + msg = (MapMessage) resp[0].getMessage(); + + backup = entry.getBackupNodes(); + if ( entry.getValue() instanceof ReplicatedMapEntry ) { + ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); + val.setOwner(getMapOwner()); + } + entry.setValue(msg.getValue()); } if (entry.isBackup()) { //select a new backup node - backup = publishEntryInfo(key, msg.getValue()); + backup = publishEntryInfo(key, entry.getValue()); } else if ( entry.isProxy() ) { //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); @@ -174,7 +185,7 @@ entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); - entry.setValue(msg.getValue()); + } catch (ChannelException x) { log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); @@ -197,6 +208,7 @@ public Object put(Object key, Object value) { + System.out.println("Adding session id:"+key); if ( !(key instanceof Serializable) ) throw new IllegalArgumentException("Key is not serializable:"+key.getClass().getName()); if ( value == null ) return remove(key); if ( !(value instanceof Serializable) ) throw new IllegalArgumentException("Value is not serializable:"+value.getClass().getName()); @@ -216,6 +228,7 @@ log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); } super.put(key,entry); + printMap(); return old; } @@ -293,7 +306,7 @@ while ( i.hasNext() ) { Map.Entry e = (Map.Entry)i.next(); MapEntry entry = (MapEntry)e.getValue(); - if ( entry.isPrimary() ) set.add(entry.getValue()); + if ( entry.isPrimary() ) set.add(entry); } return Collections.unmodifiableSet(set); } @@ -342,7 +355,7 @@ while ( i.hasNext() ) { Map.Entry e = (Map.Entry)i.next(); MapEntry entry = (MapEntry)e.getValue(); - if ( entry.isPrimary() ) values.add(entry.getValue()); + if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue()); } return Collections.unmodifiableCollection(values); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Mar 21 17:57:39 2006 @@ -29,6 +29,7 @@ import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapOwner; /** * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical @@ -66,7 +67,7 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) { + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) { super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT); } @@ -77,7 +78,7 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ - public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT); } @@ -87,7 +88,7 @@ * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ - public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) { + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName) { super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT); } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 17:57:39 2006 @@ -45,7 +45,7 @@ protected SimpleTableDemo table; public MapDemo(Channel channel ) { - map = new LazyReplicatedMap(this,channel,5000, "MapDemo"); + map = new LazyReplicatedMap(null,channel,5000, "MapDemo"); table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName()); channel.addChannelListener(this); channel.addMembershipListener(this); Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/CatalinaCluster.java Tue Mar 21 17:57:39 2006 @@ -123,6 +123,7 @@ public Manager getManager(String name); public void removeManager(String name,Manager manager); public void addManager(String name,Manager manager); + public String getManagerName(String name, Manager manager); public Valve[] getValves(); public void setChannel(Channel channel); Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=387686&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java Tue Mar 21 17:57:39 2006 @@ -0,0 +1,265 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.ha.session; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.apache.catalina.LifecycleException; +import org.apache.catalina.Loader; +import org.apache.catalina.Session; +import org.apache.catalina.ha.CatalinaCluster; +import org.apache.catalina.ha.ClusterManager; +import org.apache.catalina.ha.ClusterMessage; +import org.apache.catalina.session.StandardManager; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.io.ReplicationStream; +import org.apache.catalina.tribes.tipis.LazyReplicatedMap; + +/** + [EMAIL PROTECTED] Filip Hanik + [EMAIL PROTECTED] 1.0 + */ +public class BackupManager extends StandardManager implements ClusterManager +{ + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( BackupManager.class ); + + protected static long DEFAULT_REPL_TIMEOUT = 15000;//15 seconds + + /** Set to true if we don't want the sessions to expire on shutdown */ + protected boolean mExpireSessionsOnShutdown = true; + + /** + * The name of this manager + */ + protected String name; + + /** + * A reference to the cluster + */ + protected CatalinaCluster cluster; + + /** + * Should listeners be notified? + */ + private boolean notifyListenersOnReplication; + + + /** + * Constructor, just calls super() + * + */ + public BackupManager() { + super(); + } + + +//******************************************************************************/ +// ClusterManager Interface +//******************************************************************************/ + + public void messageDataReceived(ClusterMessage msg) { + } + + public boolean isSendClusterDomainOnly() { + return false; + } + + /** + * @param sendClusterDomainOnly The sendClusterDomainOnly to set. + */ + public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) { + } + + /** + * @return Returns the defaultMode. + */ + public boolean isDefaultMode() { + return false; + } + /** + * @param defaultMode The defaultMode to set. + */ + public void setDefaultMode(boolean defaultMode) { + } + + public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) + { + mExpireSessionsOnShutdown = expireSessionsOnShutdown; + } + + public void setCluster(CatalinaCluster cluster) { + if(log.isDebugEnabled()) + log.debug("Cluster associated with SimpleTcpReplicationManager"); + this.cluster = cluster; + } + + public boolean getExpireSessionsOnShutdown() + { + return mExpireSessionsOnShutdown; + } + + + /** + * Override persistence since they don't go hand in hand with replication for now. + */ + public void unload() throws IOException { + } + + public ClusterMessage requestCompleted(String sessionId) { + LazyReplicatedMap map = (LazyReplicatedMap)sessions; + map.replicate(sessionId,false); + return null; + } + + +//========================================================================= +// OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION +//========================================================================= + + public Session createEmptySession() { + return new DeltaSession(this); + } + + /** + * Open Stream and use correct ClassLoader (Container) Switch + * ThreadClassLoader + * + * @param data + * @return The object input stream + * @throws IOException + */ + public ReplicationStream getReplicationStream(byte[] data) throws IOException { + return getReplicationStream(data,0,data.length); + } + + public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException { + ByteArrayInputStream fis =null; + ReplicationStream ois = null; + Loader loader = null; + ClassLoader classLoader = null; + //fix to be able to run the DeltaManager + //stand alone without a container. + //use the Threads context class loader + if (container != null) loader = container.getLoader(); + if (loader != null) classLoader = loader.getClassLoader(); + else classLoader = Thread.currentThread().getContextClassLoader(); + //end fix + fis = new ByteArrayInputStream(data, offset, length); + if ( classLoader == Thread.currentThread().getContextClassLoader() ) { + ois = new ReplicationStream(fis, new ClassLoader[] {classLoader}); + } else { + ois = new ReplicationStream(fis, new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()}); + } + return ois; + } + + + + + public String getName() { + return this.name; + } + /** + * Prepare for the beginning of active use of the public methods of this + * component. This method should be called after <code>configure()</code>, + * and before any of the public methods of the component are utilized.<BR> + * Starts the cluster communication channel, this will connect with the other nodes + * in the cluster, and request the current session state to be transferred to this node. + * @exception IllegalStateException if this component has already been + * started + * @exception LifecycleException if this component detects a fatal error + * that prevents this component from being used + */ + public void start() throws LifecycleException { + if ( this.started ) return; + + + //start the javagroups channel + try { + CatalinaCluster catclust = (CatalinaCluster)cluster; + catclust.addManager(getName(), this); + this.sessions = new LazyReplicatedMap(this, + catclust.getChannel(), + DEFAULT_REPL_TIMEOUT, + getMapName()); + super.start(); + } catch ( Exception x ) { + log.error("Unable to start BackupManager",x); + throw new LifecycleException("Failed to start BackupManager",x); + } + } + + public String getMapName() { + CatalinaCluster catclust = (CatalinaCluster)cluster; + Member local = catclust.getLocalMember(); + return catclust.getManagerName(getName(),this)+"-"+local.getDomain(); + } + + /** + * Gracefully terminate the active use of the public methods of this + * component. This method should be the last one called on a given + * instance of this component.<BR> + * This will disconnect the cluster communication channel and stop the listener thread. + * @exception IllegalStateException if this component has not been started + * @exception LifecycleException if this component detects a fatal error + * that needs to be reported + */ + public void stop() throws LifecycleException + { + if ( !this.started ) return; + super.stop(); + try { + cluster.removeManager(getName(),this); + LazyReplicatedMap map = (LazyReplicatedMap)sessions; + map.breakdown(); + } catch ( Exception x ){ + log.error("Unable to stop BackupManager",x); + throw new LifecycleException("Failed to stop BackupManager",x); + } + } + + public void setDistributable(boolean dist) { + this.distributable = dist; + } + + public boolean getDistributable() { + return distributable; + } + + public void setName(String name) { + this.name = name; + } + public boolean isNotifyListenersOnReplication() { + return notifyListenersOnReplication; + } + public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) { + this.notifyListenersOnReplication = notifyListenersOnReplication; + } + + + /* + * @see org.apache.catalina.ha.ClusterManager#getCluster() + */ + public CatalinaCluster getCluster() { + return cluster; + } + + public String[] getInvalidatedSessions() { + return new String[0]; + } + +} Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaSession.java Tue Mar 21 17:57:39 2006 @@ -261,10 +261,13 @@ * @param manager * The manager with which this Session is associated */ + public DeltaSession() { + this.resetDeltaRequest(); + } + public DeltaSession(Manager manager) { - super(); + this(); this.manager = manager; - this.resetDeltaRequest(); } // ----------------------------------------------------- ReplicatedMapEntry @@ -332,7 +335,8 @@ } public void setOwner(Object owner) { - if ( owner instanceof ClusterManager ) { + if ( owner instanceof ClusterManager && getManager()==null) { + System.out.println("Setting owner for session:"+getIdInternal()+" to:"+owner); ClusterManager cm = (ClusterManager)owner; this.setManager(cm); this.setValid(true); @@ -777,7 +781,9 @@ ((DeltaManager)manager).getName(), new Boolean(isPrimarySession()), expiredId)); - ( (DeltaManager) manager).sessionExpired(expiredId); + if ( manager instanceof DeltaManager ) { + ( (DeltaManager) manager).sessionExpired(expiredId); + } } } Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java Tue Mar 21 17:57:39 2006 @@ -59,11 +59,9 @@ * When a session is replicated (not an attribute added/removed) the session is serialized into * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods. */ -public class SimpleTcpReplicationManager extends StandardManager -implements ClusterManager +public class SimpleTcpReplicationManager extends StandardManager implements ClusterManager { - public static org.apache.commons.logging.Log log = - org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class ); + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class ); //the channel configuration protected String mChannelConfig = null; Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java?rev=387686&r1=387685&r2=387686&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/tcp/SimpleTcpCluster.java Tue Mar 21 17:57:39 2006 @@ -474,9 +474,7 @@ * @see DeltaManager#start() */ public synchronized Manager createManager(String name) { - if (log.isDebugEnabled()) - log.debug("Creating ClusterManager for context " + name - + " using class " + getManagerClassName()); + if (log.isDebugEnabled()) log.debug("Creating ClusterManager for context " + name + " using class " + getManagerClassName()); Manager manager = null; try { manager = (Manager) getClass().getClassLoader().loadClass(getManagerClassName()).newInstance(); @@ -505,14 +503,11 @@ public void removeManager(String name,Manager manager) { if (manager != null) { // Notify our interested LifecycleListeners - lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT, - manager); + lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,manager); managers.remove(getManagerName(name,manager)); - if (manager instanceof ClusterManager) - ((ClusterManager) manager).setCluster(null); + if (manager instanceof ClusterManager) ((ClusterManager) manager).setCluster(null); // Notify our interested LifecycleListeners - lifecycle - .fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager); + lifecycle.fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager); } } @@ -528,8 +523,7 @@ */ public void addManager(String name, Manager manager) { if (!manager.getDistributable()) { - log.warn("Manager with name " + name - + " is not distributable, can't add as cluster manager"); + log.warn("Manager with name " + name + " is not distributable, can't add as cluster manager"); return; } // Notify our interested LifecycleListeners @@ -539,8 +533,7 @@ ClusterManager cmanager = (ClusterManager) manager ; cmanager.setName(clusterName); cmanager.setCluster(this); - if(cmanager.isDefaultMode()) - transferProperty("manager",cmanager); + if(cmanager.isDefaultMode()) transferProperty("manager",cmanager); } managers.put(clusterName, manager); // Notify our interested LifecycleListeners @@ -552,7 +545,7 @@ * @param manager * @return */ - private String getManagerName(String name, Manager manager) { + public String getManagerName(String name, Manager manager) { String clusterName = name ; if(getContainer() instanceof Engine) { Container context = manager.getContainer() ; @@ -627,8 +620,7 @@ */ public void lifecycleEvent(LifecycleEvent lifecycleEvent) { if (log.isTraceEnabled()) - log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent - .getType(), lifecycleEvent.getData())); + log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent.getType(), lifecycleEvent.getData())); } // ------------------------------------------------------ public --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]