Author: fhanik Date: Wed Mar 22 19:59:38 2006 New Revision: 388020 URL: http://svn.apache.org/viewcvs?rev=388020&view=rev Log: more complete state of the replicated map
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java Wed Mar 22 19:59:38 2006 @@ -62,4 +62,8 @@ */ public long getMemberAliveTime(); + public boolean isReady(); + public boolean isSuspect(); + public boolean isFailing(); + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Wed Mar 22 19:59:38 2006 @@ -136,7 +136,7 @@ if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) { fwd = new ByteMessage(msg.getMessage().getBytes()); } else { - fwd = XByteBuffer.deserialize(msg.getMessage().getBytes()); + fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength()); } //get the actual member with the correct alive time Member source = msg.getAddress(); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java Wed Mar 22 19:59:38 2006 @@ -93,9 +93,7 @@ return clazz; } - public Class findExternalClass(String name) - throws ClassNotFoundException { - + public Class findExternalClass(String name) throws ClassNotFoundException { ClassNotFoundException cnfe = null; for (int i=0; i<classLoaders.length; i++ ) { try { @@ -107,6 +105,11 @@ } if ( cnfe != null ) throw cnfe; else throw new ClassNotFoundException(name); + } + + public void close() throws IOException { + this.classLoaders = null; + super.close(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Wed Mar 22 19:59:38 2006 @@ -23,6 +23,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.io.ObjectInputStream; /** * The XByteBuffer provides a dual functionality. @@ -520,18 +521,21 @@ public static Serializable deserialize(byte[] data, int offset, int length) throws IOException, ClassNotFoundException, ClassCastException { - return deserialize(data,offset,length,new ClassLoader[] {XByteBuffer.class.getClassLoader()}); + return deserialize(data,offset,length,null); } - + public static int invokecount = 0; public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls) throws IOException, ClassNotFoundException, ClassCastException { + synchronized (XByteBuffer.class) { invokecount++;} Object message = null; - if ( cls == null ) cls = new ClassLoader[] {XByteBuffer.class.getClassLoader()}; + if ( cls == null ) cls = new ClassLoader[0]; if (data != null) { InputStream instream = new ByteArrayInputStream(data,offset,length); - ReplicationStream stream = new ReplicationStream(instream,cls); + ObjectInputStream stream = null; + stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream); message = stream.readObject(); instream.close(); + stream.close(); } if ( message == null ) { return null; @@ -553,6 +557,7 @@ ByteArrayOutputStream outs = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(outs); out.writeObject(msg); + out.flush(); byte[] data = outs.toByteArray(); return data; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java Wed Mar 22 19:59:38 2006 @@ -23,6 +23,7 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.io.XByteBuffer; import java.util.Arrays; +import org.apache.catalina.tribes.tcp.SenderState; /** * A <b>membership</b> implementation using simple multicast. @@ -106,6 +107,17 @@ this.port = port; this.domain = domain.getBytes(); this.memberAliveTime=aliveTime; + } + + + public boolean isReady() { + return SenderState.getSenderState(this).isReady(); + } + public boolean isSuspect() { + return SenderState.getSenderState(this).isSuspect(); + } + public boolean isFailing() { + return SenderState.getSenderState(this).isFailing(); } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java Wed Mar 22 19:59:38 2006 @@ -46,18 +46,17 @@ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class); private MessageListener listener; - private String host; + private String host = "auto"; private InetAddress bind; - private int port; + private int port = 4000; private int rxBufSize = 43800; private int txBufSize = 25188; private boolean listen = false; private ThreadPool pool; private boolean direct = true; - private long tcpSelectorTimeout; - private String tcpListenAddress; + private long tcpSelectorTimeout = 100; //how many times to search for an available socket - private int autoBind = 1; + private int autoBind = 10; private int maxThreads = 25; private int minThreads = 6; @@ -233,7 +232,7 @@ } public String getTcpListenAddress() { - return tcpListenAddress; + return getHost(); } public int getAutoBind() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java Wed Mar 22 19:59:38 2006 @@ -155,13 +155,18 @@ * This is considered an asynchronized request */ if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel); - //process the message - getCallback().messageDataReceived(msgs[i]); + try { + //process the message + getCallback().messageDataReceived(msgs[i]); + }catch ( Exception e ) { + log.error("Processing of cluster message failed.",e); + } /** * Use send ack here if you want the request to complete on this * server before sending the ack to the remote server * This is considered a synchronized request */ + if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed Mar 22 19:59:38 2006 @@ -94,6 +94,7 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap + * @param cls - a list of classloaders to be used for deserialization of objects. */ public AbstractReplicatedMap(Object owner, Channel channel, @@ -101,9 +102,10 @@ String mapContextName, int initialCapacity, float loadFactor, - int channelSendOptions) { + int channelSendOptions, + ClassLoader[] cls) { super(initialCapacity, loadFactor); - init(owner, channel, mapContextName, timeout, channelSendOptions); + init(owner, channel, mapContextName, timeout, channelSendOptions, cls); } @@ -111,9 +113,9 @@ return new Member[] {m}; } - private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) { + private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { this.mapOwner = owner; - + this.externalLoaders = cls; this.channelSendOptions = channelSendOptions; this.channel = channel; this.rpcTimeout = timeout; @@ -143,7 +145,7 @@ MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, wrap(channel.getLocalMember(false))); if ( rpc) { - Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, channelSendOptions,rpcTimeout); + Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout); for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); @@ -233,7 +235,7 @@ } try { if ( entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) { - channel.send(entry.getBackupNodes(), msg, channel.SEND_OPTIONS_DEFAULT); + channel.send(entry.getBackupNodes(), msg, channelSendOptions); } } catch (ChannelException x) { log.error("Unable to replicate data.", x); @@ -266,36 +268,22 @@ null, null, null, null); Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); if (resp.length > 0) { - msg = (MapMessage) resp[0].getMessage(); - ArrayList list = (ArrayList) msg.getValue(); - for (int i = 0; i < list.size(); i++) { - messageReceived((Serializable)list.get(i),resp[0].getSource()); -// MapMessage m = (MapMessage) list.get(i); -// try { -// m.deserialize(getExternalLoaders()); -// //make sure we don't store that actual object as primary or backup -// MapEntry local = (MapEntry)super.get(m.getKey()); -// if (local != null && (!local.isProxy())) continue; -// -// //store the object -// if (m.getValue()!=null && m.getValue() instanceof ReplicatedMapEntry ) { -// ((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner()); -// } -// MapEntry entry = new MapEntry(m.getKey(), m.getValue()); -// entry.setBackup(false); -// entry.setProxy(true); -// entry.setBackupNodes(m.getBackupNodes()); -// super.put(entry.getKey(), entry); -// } catch (IOException x) { -// log.error("Unable to deserialize MapMessage.", x); -// } catch (ClassNotFoundException x) { -// log.error("Unable to deserialize MapMessage.", x); -// } - }//for + synchronized (stateMutex) { + msg = (MapMessage) resp[0].getMessage(); + msg.deserialize(getExternalLoaders()); + ArrayList list = (ArrayList) msg.getValue(); + for (int i = 0; i < list.size(); i++) { + messageReceived( (Serializable) list.get(i), resp[0].getSource()); + } //for + } } } } catch (ChannelException x) { log.error("Unable to transfer LazyReplicatedMap state.", x); + } catch (IOException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } catch (ClassNotFoundException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); } stateTransferred = true; } @@ -333,12 +321,12 @@ Map.Entry e = (Map.Entry) i.next(); MapEntry entry = (MapEntry) e.getValue(); MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY, - false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), - null, entry.getBackupNodes()); + false, (Serializable) entry.getKey(), null,null, entry.getBackupNodes()); list.add(me); } mapmsg.setValue(list); return mapmsg; + } //synchronized } @@ -426,15 +414,15 @@ diff.unlock(); } } else { - entry.setValue(mapmsg.getValue()); + if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); diff.setOwner(getMapOwner()); } //end if } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); re.setOwner(getMapOwner()); - entry.setValue(mapmsg.getValue()); + if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); } else { - entry.setValue(mapmsg.getValue()); + if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); } //end if } //end if super.put(entry.getKey(), entry); @@ -459,7 +447,8 @@ while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); MapEntry entry = (MapEntry) e.getValue(); - if (entry.isPrimary() && entry.getBackupNodes() == null && entry.getBackupNodes().length == 0) { + if ( entry == null ) continue; + if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); @@ -473,6 +462,7 @@ } public boolean inSet(Member m, Member[] set) { + if ( set == null ) return false; boolean result = false; for (int i=0; i<set.length && (!result); i++ ) if ( m.equals(set[i]) ) result = true; @@ -484,8 +474,6 @@ } public void memberDisappeared(Member member) { - Exception ex = new Exception("[DEBUG] Removing member:"+member.getName()); - ex.printStackTrace(); synchronized (mapMembers) { mapMembers.remove(member); } @@ -527,7 +515,7 @@ protected void printMap(String header) { try { System.out.println("\nDEBUG MAP:"+header); - System.out.println("Map["+((Object)this).toString()+"; " + new String(mapContextName, chset) + ", Map Size:" + super.size()); + System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size()); Member[] mbrs = getMapMembers(); for ( int i=0; i<mbrs.length;i++ ) { System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName()); @@ -658,7 +646,7 @@ // map message to send to and from other maps //------------------------------------------------------------------------------ - public static class MapMessage implements Externalizable { + public static class MapMessage implements Serializable { public static final int MSG_BACKUP = 1; public static final int MSG_RETRIEVE_BACKUP = 2; public static final int MSG_PROXY = 3; @@ -670,8 +658,8 @@ private byte[] mapId; private int msgtype; private boolean diff; - private Serializable key; - private Serializable value; + private transient Serializable key; + private transient Serializable value; private byte[] valuedata; private byte[] keydata; private byte[] diffvalue; @@ -681,7 +669,7 @@ public MapMessage(byte[] mapId,int msgtype, boolean diff, Serializable key, Serializable value, - byte[] diffvalue, Member[] nodes) { + byte[] diffvalue, Member[] nodes) { this.mapId = mapId; this.msgtype = msgtype; this.diff = diff; @@ -689,6 +677,8 @@ this.value = value; this.diffvalue = diffvalue; this.nodes = nodes; + setValue(value); + setKey(key); } public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException { @@ -717,6 +707,7 @@ if ( key!=null ) return key; if ( keydata == null || keydata.length == 0 ) return null; key = XByteBuffer.deserialize(keydata,0,keydata.length,cls); + keydata = null; return key; } @@ -737,6 +728,7 @@ if ( value!=null ) return value; if ( valuedata == null || valuedata.length == 0 ) return null; value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls); + valuedata = null;; return value; } @@ -761,7 +753,21 @@ } public void setValue(Serializable value) { - this.value = value; + try { + if ( value != null ) valuedata = XByteBuffer.serialize(value); + this.value = value; + }catch ( IOException x ) { + throw new RuntimeException(x); + } + } + + public void setKey(Serializable key) { + try { + if (key != null) keydata = XByteBuffer.serialize(key); + this.key = key; + } catch (IOException x) { + throw new RuntimeException(x); + } } protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException { @@ -775,51 +781,6 @@ return members; } - protected byte[] readBytes(ObjectInput in) throws IOException { - byte[] data = new byte[in.readInt()]; - in.read(data); - return data; - } - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - mapId = new byte[in.readInt()]; - in.read(mapId); - msgtype = in.readInt(); - switch (msgtype) { - case MSG_BACKUP: - case MSG_STATE: { - diff = in.readBoolean(); - keydata = readBytes(in); - if (diff) { - diffvalue = readBytes(in); - } else { - valuedata = readBytes(in); - } //endif - nodes = readMembers(in); - break; - } - case MSG_RETRIEVE_BACKUP: { - keydata = readBytes(in); - valuedata = readBytes(in); - break; - } - case MSG_REMOVE: { - keydata = readBytes(in); - break; - } - case MSG_PROXY: { - keydata = readBytes(in); - this.nodes = readMembers(in); - break; - } - case MSG_START: - case MSG_STOP: { - nodes = readMembers(in); - break; - } - } //switch - } //readExternal - protected void writeMembers(ObjectOutput out,Member[] members) throws IOException { if ( members == null ) members = new Member[0]; out.writeInt(members.length); @@ -832,55 +793,6 @@ } } - protected void writeBytes(ObjectOutput out, byte[] data) throws IOException { - out.writeInt(data.length); - out.write(data); - } - - protected void writeObject(ObjectOutput out, Serializable o) throws IOException { - byte[] data = XByteBuffer.serialize(o); - writeBytes(out,data); - } - - public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(mapId.length); - out.write(mapId); - out.writeInt(msgtype); - switch (msgtype) { - case MSG_BACKUP: - case MSG_STATE: { - out.writeBoolean(diff); - writeObject(out,key); - if (diff) { - out.writeInt(diffvalue.length); - out.write(diffvalue); - } else { - writeObject(out,value); - } //endif - writeMembers(out,nodes); - break; - } - case MSG_RETRIEVE_BACKUP: { - writeObject(out,key); - writeObject(out,value); - break; - } - case MSG_REMOVE: { - writeObject(out,key); - break; - } - case MSG_PROXY: { - writeObject(out,key); - writeMembers(out,nodes); - break; - } - case MSG_START: - case MSG_STOP: { - writeMembers(out,nodes); - break; - } - } //switch - } //writeExternal /** * shallow clone @@ -927,12 +839,20 @@ return externalLoaders; } + public int getChannelSendOptions() { + return channelSendOptions; + } + public void setMapOwner(Object mapOwner) { this.mapOwner = mapOwner; } public void setExternalLoaders(ClassLoader[] externalLoaders) { this.externalLoaders = externalLoaders; + } + + public void setChannelSendOptions(int channelSendOptions) { + this.channelSendOptions = channelSendOptions; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Wed Mar 22 19:59:38 2006 @@ -84,8 +84,8 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) { - super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT); + public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, ClassLoader[] cls) { + super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT,cls); } /** @@ -95,8 +95,8 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ - public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { - super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT); + public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, ClassLoader[] cls) { + super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); } /** @@ -105,8 +105,8 @@ * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ - public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) { - super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT); + public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { + super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT, cls); } @@ -166,7 +166,7 @@ ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); val.setOwner(getMapOwner()); } - entry.setValue(msg.getValue()); + if ( msg.getValue()!=null ) entry.setValue(msg.getValue()); } if (entry.isBackup()) { //select a new backup node @@ -203,7 +203,6 @@ public Object put(Object key, Object value) { - System.out.println("Adding session id:"+key); if ( !(key instanceof Serializable) ) throw new IllegalArgumentException("Key is not serializable:"+key.getClass().getName()); if ( value == null ) return remove(key); if ( !(value instanceof Serializable) ) throw new IllegalArgumentException("Value is not serializable:"+value.getClass().getName()); @@ -247,8 +246,9 @@ */ public Object remove(Object key) { MapEntry entry = (MapEntry)super.remove(key); - MapMessage msg = new MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null); + try { + MapMessage msg = new MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null); getChannel().send(getMapMembers(), msg,Channel.SEND_OPTIONS_DEFAULT); } catch ( ChannelException x ) { log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x); @@ -294,6 +294,10 @@ return super.keySet(); } + public int sizeFull() { + return super.size(); + } + public Set entrySet() { LinkedHashSet set = new LinkedHashSet(super.size()); Iterator i = super.entrySet().iterator(); @@ -318,9 +322,6 @@ return Collections.unmodifiableSet(set); } - public int sizeFull() { - return super.size(); - } public int size() { //todo, implement a counter variable instead @@ -330,7 +331,7 @@ while ( i.hasNext() ) { Map.Entry e = (Map.Entry)i.next(); MapEntry entry = (MapEntry)e.getValue(); - if ( entry.isPrimary() ) counter++; + if ( entry.isPrimary() && entry.getValue()!=null ) counter++; } return counter; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Wed Mar 22 19:59:38 2006 @@ -50,6 +50,9 @@ * @todo implement periodic sync/transfer thread * @author Filip Hanik * @version 1.0 + * + * @todo memberDisappeared, should do nothing except change map membership + * by default it relocates the primary objects */ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { @@ -66,8 +69,8 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) { - super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT); + public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor, ClassLoader[] cls) { + super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls); } /** @@ -77,8 +80,8 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ - public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { - super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT); + public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, ClassLoader[] cls) { + super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT, cls); } /** @@ -87,8 +90,8 @@ * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ - public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) { - super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT); + public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); } //------------------------------------------------------------------------------ Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Wed Mar 22 19:59:38 2006 @@ -79,6 +79,10 @@ long timeout) throws ChannelException { if ( destination==null || destination.length == 0 ) return new Response[0]; + + //avoid dead lock + channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; + RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout); try { Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Wed Mar 22 19:59:38 2006 @@ -244,7 +244,8 @@ - public static class LoadMessage extends ByteMessage implements Serializable { + //public static class LoadMessage extends ByteMessage implements Serializable { + public static class LoadMessage implements Serializable { public static byte[] outdata = new byte[size]; public static Random r = new Random(System.currentTimeMillis()); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Wed Mar 22 19:59:38 2006 @@ -45,10 +45,14 @@ protected SimpleTableDemo table; public MapDemo(Channel channel ) { - map = new LazyReplicatedMap(null,channel,5000, "MapDemo"); + map = new LazyReplicatedMap(null,channel,5000, "MapDemo",null); table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName()); channel.addChannelListener(this); channel.addMembershipListener(this); + for ( int i=0; i<1000; i++ ) { + map.put("MyKey-"+i,"My String Value-"+i); + } + this.messageReceived(null,null); } public boolean accept(Serializable msg, Member source) { Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java Wed Mar 22 19:59:38 2006 @@ -28,6 +28,7 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.io.ReplicationStream; import org.apache.catalina.tribes.tipis.LazyReplicatedMap; +import org.apache.catalina.tribes.Channel; /** [EMAIL PROTECTED] Filip Hanik @@ -56,7 +57,10 @@ * Should listeners be notified? */ private boolean notifyListenersOnReplication; - + /** + * + */ + private int mapSendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK; /** * Constructor, just calls super() @@ -139,7 +143,7 @@ ClassLoader classLoader = null; if (container != null) loader = container.getLoader(); if (loader != null) classLoader = loader.getClassLoader(); - else classLoader = Thread.currentThread().getContextClassLoader(); + if ( classLoader == null ) classLoader = Thread.currentThread().getContextClassLoader(); if ( classLoader == Thread.currentThread().getContextClassLoader() ) { return new ClassLoader[] {classLoader}; } else { @@ -189,8 +193,9 @@ LazyReplicatedMap map = new LazyReplicatedMap(this, catclust.getChannel(), DEFAULT_REPL_TIMEOUT, - getMapName()); - map.setExternalLoaders(getClassLoaders()); + getMapName(), + getClassLoaders()); + map.setChannelSendOptions(mapSendOptions); this.sessions = map; super.start(); } catch ( Exception x ) { @@ -246,6 +251,9 @@ this.notifyListenersOnReplication = notifyListenersOnReplication; } + public void setMapSendOptions(int mapSendOptions) { + this.mapSendOptions = mapSendOptions; + } /* * @see org.apache.catalina.ha.ClusterManager#getCluster() @@ -253,7 +261,11 @@ public CatalinaCluster getCluster() { return cluster; } - + + public int getMapSendOptions() { + return mapSendOptions; + } + public String[] getInvalidatedSessions() { return new String[0]; } Modified: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java?rev=388020&r1=388019&r2=388020&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java (original) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java Wed Mar 22 19:59:38 2006 @@ -159,16 +159,22 @@ switch ( info.getType() ) { case TYPE_ATTRIBUTE: { if ( info.getAction() == ACTION_SET ) { + if ( log.isTraceEnabled() ) log.trace("Session.setAttribute('"+info.getName()+"', '"+info.getValue()+"')"); session.setAttribute(info.getName(), info.getValue(),notifyListeners,false); - } else + } else { + if ( log.isTraceEnabled() ) log.trace("Session.removeAttribute('"+info.getName()+"')"); session.removeAttribute(info.getName(),notifyListeners,false); + } + break; }//case case TYPE_ISNEW: { + if ( log.isTraceEnabled() ) log.trace("Session.setNew('"+info.getValue()+"')"); session.setNew(((Boolean)info.getValue()).booleanValue(),false); break; }//case case TYPE_MAXINTERVAL: { + if ( log.isTraceEnabled() ) log.trace("Session.setMaxInactiveInterval('"+info.getValue()+"')"); session.setMaxInactiveInterval(((Integer)info.getValue()).intValue(),false); break; }//case @@ -341,28 +347,30 @@ return other.getName().equals(this.getName()); } - public synchronized void readExternal(java.io.ObjectInput in ) throws java.io.IOException, - java.lang.ClassNotFoundException { + public synchronized void readExternal(java.io.ObjectInput in ) throws IOException,ClassNotFoundException { //type - int //action - int //name - String + //hasvalue - boolean //value - object type = in.readInt(); action = in.readInt(); name = in.readUTF(); - value = in.readObject(); + boolean hasValue = in.readBoolean(); + if ( hasValue ) value = in.readObject(); } - public synchronized void writeExternal(java.io.ObjectOutput out) throws java.io. - IOException { + public synchronized void writeExternal(java.io.ObjectOutput out) throws IOException { //type - int //action - int //name - String + //hasvalue - boolean //value - object out.writeInt(getType()); out.writeInt(getAction()); out.writeUTF(getName()); - out.writeObject(getValue()); + out.writeBoolean(getValue()!=null); + if (getValue()!=null) out.writeObject(getValue()); } public String toString() { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]