Author: fhanik
Date: Wed Mar 22 19:59:38 2006
New Revision: 388020

URL: http://svn.apache.org/viewcvs?rev=388020&view=rev
Log:
more complete state of the replicated map

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
    
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
    
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
 Wed Mar 22 19:59:38 2006
@@ -62,4 +62,8 @@
      */
     public long getMemberAliveTime();
     
+    public boolean isReady();
+    public boolean isSuspect();
+    public boolean isFailing();
+    
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Wed Mar 22 19:59:38 2006
@@ -136,7 +136,7 @@
             if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == 
SEND_OPTIONS_BYTE_MESSAGE ) {
                 fwd = new ByteMessage(msg.getMessage().getBytes());
             } else {
-                fwd = XByteBuffer.deserialize(msg.getMessage().getBytes());
+                fwd = 
XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
             }
             //get the actual member with the correct alive time
             Member source = msg.getAddress();

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ReplicationStream.java
 Wed Mar 22 19:59:38 2006
@@ -93,9 +93,7 @@
         return clazz;
     }
 
-    public Class findExternalClass(String name)
-        throws ClassNotFoundException  {
-
+    public Class findExternalClass(String name) throws ClassNotFoundException  
{
         ClassNotFoundException cnfe = null;
         for (int i=0; i<classLoaders.length; i++ ) {
             try {
@@ -107,6 +105,11 @@
         }
         if ( cnfe != null ) throw cnfe;
         else throw new ClassNotFoundException(name);
+    }
+    
+    public void close() throws IOException  {
+        this.classLoaders = null;
+        super.close();
     }
 
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Wed Mar 22 19:59:38 2006
@@ -23,6 +23,7 @@
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.io.ObjectInputStream;
 
 /**
  * The XByteBuffer provides a dual functionality.
@@ -520,18 +521,21 @@
     
     public static Serializable deserialize(byte[] data, int offset, int 
length)  
         throws IOException, ClassNotFoundException, ClassCastException {
-        return deserialize(data,offset,length,new ClassLoader[] 
{XByteBuffer.class.getClassLoader()});     
+        return deserialize(data,offset,length,null);     
     }
-
+    public static int invokecount = 0;
     public static Serializable deserialize(byte[] data, int offset, int 
length, ClassLoader[] cls) 
         throws IOException, ClassNotFoundException, ClassCastException {
+        synchronized (XByteBuffer.class) { invokecount++;}
         Object message = null;
-        if ( cls == null ) cls = new ClassLoader[] 
{XByteBuffer.class.getClassLoader()};
+        if ( cls == null ) cls = new ClassLoader[0];
         if (data != null) {
             InputStream  instream = new 
ByteArrayInputStream(data,offset,length);
-            ReplicationStream stream = new ReplicationStream(instream,cls);
+            ObjectInputStream stream = null;
+            stream = (cls.length>0)? new ReplicationStream(instream,cls):new 
ObjectInputStream(instream);
             message = stream.readObject();
             instream.close();
+            stream.close();
         }
         if ( message == null ) {
             return null;
@@ -553,6 +557,7 @@
         ByteArrayOutputStream outs = new ByteArrayOutputStream();
         ObjectOutputStream out = new ObjectOutputStream(outs);
         out.writeObject(msg);
+        out.flush();
         byte[] data = outs.toByteArray();
         return data;
     }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/MemberImpl.java
 Wed Mar 22 19:59:38 2006
@@ -23,6 +23,7 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import java.util.Arrays;
+import org.apache.catalina.tribes.tcp.SenderState;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -106,6 +107,17 @@
         this.port = port;
         this.domain = domain.getBytes();
         this.memberAliveTime=aliveTime;
+    }
+    
+    
+    public boolean isReady() {
+        return SenderState.getSenderState(this).isReady();
+    }
+    public boolean isSuspect() {
+        return SenderState.getSenderState(this).isSuspect();
+    }
+    public boolean isFailing() {
+        return SenderState.getSenderState(this).isFailing();
     }
 
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
 Wed Mar 22 19:59:38 2006
@@ -46,18 +46,17 @@
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class);
     
     private MessageListener listener;
-    private String host;
+    private String host = "auto";
     private InetAddress bind;
-    private int port;
+    private int port  = 4000;
     private int rxBufSize = 43800;
     private int txBufSize = 25188;
     private boolean listen = false;
     private ThreadPool pool;
     private boolean direct = true;
-    private long tcpSelectorTimeout;
-    private String tcpListenAddress;
+    private long tcpSelectorTimeout = 100;
     //how many times to search for an available socket
-    private int autoBind = 1;
+    private int autoBind = 10;
     private int maxThreads = 25;
     private int minThreads = 6;
 
@@ -233,7 +232,7 @@
     }
 
     public String getTcpListenAddress() {
-        return tcpListenAddress;
+        return getHost();
     }
 
     public int getAutoBind() {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
 Wed Mar 22 19:59:38 2006
@@ -155,13 +155,18 @@
                  * This is considered an asynchronized request
                  */
                 if (ClusterData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,channel);
-                //process the message
-                getCallback().messageDataReceived(msgs[i]);
+                try {
+                    //process the message
+                    getCallback().messageDataReceived(msgs[i]);
+                }catch ( Exception e ) {
+                    log.error("Processing of cluster message failed.",e);
+                } 
                 /**
                  * Use send ack here if you want the request to complete on 
this 
                  * server before sending the ack to the remote server
                  * This is considered a synchronized request
                  */
+                
                 if (ClusterData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel);
             }                        
         }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 Wed Mar 22 19:59:38 2006
@@ -94,6 +94,7 @@
      * @param mapContextName String - unique name for this map, to allow 
multiple maps per channel
      * @param initialCapacity int - the size of this map, see HashMap
      * @param loadFactor float - load factor, see HashMap
+     * @param cls - a list of classloaders to be used for deserialization of 
objects.
      */
     public AbstractReplicatedMap(Object owner,
                                  Channel channel, 
@@ -101,9 +102,10 @@
                                  String mapContextName, 
                                  int initialCapacity,
                                  float loadFactor,
-                                 int channelSendOptions) {
+                                 int channelSendOptions,
+                                 ClassLoader[] cls) {
         super(initialCapacity, loadFactor);
-        init(owner, channel, mapContextName, timeout, channelSendOptions);
+        init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
         
     }
 
@@ -111,9 +113,9 @@
         return new Member[] {m};
     }
 
-    private void init(Object owner, Channel channel, String mapContextName, 
long timeout, int channelSendOptions) {
+    private void init(Object owner, Channel channel, String mapContextName, 
long timeout, int channelSendOptions,ClassLoader[] cls) {
         this.mapOwner = owner;
-        
+        this.externalLoaders = cls;
         this.channelSendOptions = channelSendOptions;
         this.channel = channel;
         this.rpcTimeout = timeout;
@@ -143,7 +145,7 @@
             MapMessage msg = new MapMessage(this.mapContextName, msgtype,
                                             false, null, null, null, 
wrap(channel.getLocalMember(false)));
             if ( rpc) {
-                Response[] resp = rpcChannel.send(channel.getMembers(), msg, 
rpcChannel.FIRST_REPLY, channelSendOptions,rpcTimeout);
+                Response[] resp = rpcChannel.send(channel.getMembers(), msg, 
rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
                 for (int i = 0; i < resp.length; i++) {
                     mapMemberAdded(resp[i].getSource());
                     messageReceived(resp[i].getMessage(), resp[i].getSource());
@@ -233,7 +235,7 @@
             }
             try {
                 if ( entry.getBackupNodes()!= null && 
entry.getBackupNodes().length > 0 ) {
-                    channel.send(entry.getBackupNodes(), msg, 
channel.SEND_OPTIONS_DEFAULT);
+                    channel.send(entry.getBackupNodes(), msg, 
channelSendOptions);
                 }
             } catch (ChannelException x) {
                 log.error("Unable to replicate data.", x);
@@ -266,36 +268,22 @@
                                                 null, null, null, null);
                 Response[] resp = rpcChannel.send(new Member[] {backup}, msg, 
rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
                 if (resp.length > 0) {
-                    msg = (MapMessage) resp[0].getMessage();
-                    ArrayList list = (ArrayList) msg.getValue();
-                    for (int i = 0; i < list.size(); i++) {
-                        
messageReceived((Serializable)list.get(i),resp[0].getSource());
-//                        MapMessage m = (MapMessage) list.get(i);
-//                        try {
-//                            m.deserialize(getExternalLoaders());
-//                            //make sure we don't store that actual object as 
primary or backup
-//                            MapEntry local = (MapEntry)super.get(m.getKey());
-//                            if (local != null && (!local.isProxy())) 
continue;
-//
-//                            //store the object
-//                            if (m.getValue()!=null && m.getValue() 
instanceof ReplicatedMapEntry ) {
-//                                
((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner());
-//                            }
-//                            MapEntry entry = new MapEntry(m.getKey(), 
m.getValue());
-//                            entry.setBackup(false);
-//                            entry.setProxy(true);
-//                            entry.setBackupNodes(m.getBackupNodes());
-//                            super.put(entry.getKey(), entry);
-//                        } catch (IOException x) {
-//                            log.error("Unable to deserialize MapMessage.", 
x);
-//                        } catch (ClassNotFoundException x) {
-//                            log.error("Unable to deserialize MapMessage.", 
x);
-//                        }
-                    }//for
+                    synchronized (stateMutex) {
+                        msg = (MapMessage) resp[0].getMessage();
+                        msg.deserialize(getExternalLoaders());
+                        ArrayList list = (ArrayList) msg.getValue();
+                        for (int i = 0; i < list.size(); i++) {
+                            messageReceived( (Serializable) list.get(i), 
resp[0].getSource());
+                        } //for
+                    }
                 }
             }
         } catch (ChannelException x) {
             log.error("Unable to transfer LazyReplicatedMap state.", x);
+        } catch (IOException x) {
+            log.error("Unable to transfer LazyReplicatedMap state.", x);
+        } catch (ClassNotFoundException x) {
+            log.error("Unable to transfer LazyReplicatedMap state.", x);
         }
         stateTransferred = true;
     }
@@ -333,12 +321,12 @@
                     Map.Entry e = (Map.Entry) i.next();
                     MapEntry entry = (MapEntry) e.getValue();
                     MapMessage me = new MapMessage(mapContextName, 
MapMessage.MSG_PROXY,
-                        false, (Serializable) entry.getKey(), (Serializable) 
entry.getValue(),
-                        null, entry.getBackupNodes());
+                        false, (Serializable) entry.getKey(), null,null, 
entry.getBackupNodes());
                     list.add(me);
                 }
                 mapmsg.setValue(list);
                 return mapmsg;
+                
             } //synchronized
         }
 
@@ -426,15 +414,15 @@
                             diff.unlock();
                         }
                     } else {
-                        entry.setValue(mapmsg.getValue());
+                        if ( mapmsg.getValue()!=null ) 
entry.setValue(mapmsg.getValue());
                         diff.setOwner(getMapOwner());
                     } //end if
                 } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                     ReplicatedMapEntry re = 
(ReplicatedMapEntry)mapmsg.getValue();
                     re.setOwner(getMapOwner());
-                    entry.setValue(mapmsg.getValue());
+                    if ( mapmsg.getValue()!=null ) 
entry.setValue(mapmsg.getValue());
                 } else {
-                    entry.setValue(mapmsg.getValue());
+                    if ( mapmsg.getValue()!=null ) 
entry.setValue(mapmsg.getValue());
                 } //end if
             } //end if
             super.put(entry.getKey(), entry);
@@ -459,7 +447,8 @@
             while (i.hasNext()) {
                 Map.Entry e = (Map.Entry) i.next();
                 MapEntry entry = (MapEntry) e.getValue();
-                if (entry.isPrimary() && entry.getBackupNodes() == null && 
entry.getBackupNodes().length == 0) {
+                if ( entry == null ) continue;
+                if (entry.isPrimary() && (entry.getBackupNodes() == null || 
entry.getBackupNodes().length == 0)) {
                     try {
                         Member[] backup = publishEntryInfo(entry.getKey(), 
entry.getValue());
                         entry.setBackupNodes(backup);
@@ -473,6 +462,7 @@
     }
     
     public boolean inSet(Member m, Member[] set) {
+        if ( set == null ) return false;
         boolean result = false;
         for (int i=0; i<set.length && (!result); i++ )
             if ( m.equals(set[i]) ) result = true;
@@ -484,8 +474,6 @@
     }
 
     public void memberDisappeared(Member member) {
-        Exception ex = new Exception("[DEBUG] Removing 
member:"+member.getName());
-        ex.printStackTrace();
         synchronized (mapMembers) {
             mapMembers.remove(member);
         }
@@ -527,7 +515,7 @@
     protected void printMap(String header) {
         try {
             System.out.println("\nDEBUG MAP:"+header);
-            System.out.println("Map["+((Object)this).toString()+"; " + new 
String(mapContextName, chset) + ", Map Size:" + super.size());
+            System.out.println("Map["+ new String(mapContextName, chset) + ", 
Map Size:" + super.size());
             Member[] mbrs = getMapMembers();
             for ( int i=0; i<mbrs.length;i++ ) {
                 System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
@@ -658,7 +646,7 @@
 //                map message to send to and from other maps
 
//------------------------------------------------------------------------------
 
-    public static class MapMessage implements Externalizable {
+    public static class MapMessage implements Serializable {
         public static final int MSG_BACKUP = 1;
         public static final int MSG_RETRIEVE_BACKUP = 2;
         public static final int MSG_PROXY = 3;
@@ -670,8 +658,8 @@
         private byte[] mapId;
         private int msgtype;
         private boolean diff;
-        private Serializable key;
-        private Serializable value;
+        private transient Serializable key;
+        private transient Serializable value;
         private byte[] valuedata;
         private byte[] keydata;
         private byte[] diffvalue;
@@ -681,7 +669,7 @@
 
         public MapMessage(byte[] mapId,int msgtype, boolean diff,
                           Serializable key, Serializable value,
-                          byte[] diffvalue, Member[] nodes) {
+                          byte[] diffvalue, Member[] nodes)  {
             this.mapId = mapId;
             this.msgtype = msgtype;
             this.diff = diff;
@@ -689,6 +677,8 @@
             this.value = value;
             this.diffvalue = diffvalue;
             this.nodes = nodes;
+            setValue(value);
+            setKey(key);
         }
         
         public void deserialize(ClassLoader[] cls) throws IOException, 
ClassNotFoundException {
@@ -717,6 +707,7 @@
             if ( key!=null ) return key;
             if ( keydata == null || keydata.length == 0 ) return null;
             key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
+            keydata = null;
             return key;
         }
         
@@ -737,6 +728,7 @@
             if ( value!=null ) return value;
             if ( valuedata == null || valuedata.length == 0 ) return null;
             value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
+            valuedata = null;;
             return value;
         }
         
@@ -761,7 +753,21 @@
         }
 
         public void setValue(Serializable value) {
-            this.value = value;
+            try {
+                if ( value != null ) valuedata = XByteBuffer.serialize(value);
+                this.value = value;
+            }catch ( IOException x ) {
+                throw new RuntimeException(x);
+            }
+        }
+        
+        public void setKey(Serializable key) {
+            try {
+                if (key != null) keydata = XByteBuffer.serialize(key);
+                this.key = key;
+            } catch (IOException x) {
+                throw new RuntimeException(x);
+            }
         }
         
         protected Member[] readMembers(ObjectInput in) throws IOException, 
ClassNotFoundException {
@@ -775,51 +781,6 @@
             return members;
         }
         
-        protected byte[] readBytes(ObjectInput in) throws IOException {
-            byte[] data = new byte[in.readInt()];
-            in.read(data);
-            return data;
-        }
-
-        public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            mapId = new byte[in.readInt()];
-            in.read(mapId);
-            msgtype = in.readInt();
-            switch (msgtype) {
-                case MSG_BACKUP:
-                case MSG_STATE: {
-                    diff = in.readBoolean();
-                    keydata = readBytes(in);
-                    if (diff) {
-                        diffvalue = readBytes(in);
-                    } else {
-                        valuedata = readBytes(in);
-                    } //endif
-                    nodes = readMembers(in);
-                    break;
-                }
-                case MSG_RETRIEVE_BACKUP: {
-                    keydata = readBytes(in);
-                    valuedata = readBytes(in);
-                    break;
-                }
-                case MSG_REMOVE: {
-                    keydata = readBytes(in);
-                    break;
-                }
-                case MSG_PROXY: {
-                    keydata = readBytes(in);
-                    this.nodes = readMembers(in);
-                    break;
-                }
-                case MSG_START:
-                case MSG_STOP: {
-                        nodes = readMembers(in);
-                        break;
-                }
-            } //switch
-        } //readExternal
-        
         protected void writeMembers(ObjectOutput out,Member[] members) throws 
IOException {
             if ( members == null ) members = new Member[0];
             out.writeInt(members.length);
@@ -832,55 +793,6 @@
             }
         }
         
-        protected void writeBytes(ObjectOutput out, byte[] data) throws 
IOException {
-            out.writeInt(data.length);
-            out.write(data);
-        }
-        
-        protected void writeObject(ObjectOutput out, Serializable o) throws 
IOException {
-            byte[] data = XByteBuffer.serialize(o);
-            writeBytes(out,data);
-        }
-        
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(mapId.length);
-            out.write(mapId);
-            out.writeInt(msgtype);
-            switch (msgtype) {
-                case MSG_BACKUP:
-                case MSG_STATE: {
-                    out.writeBoolean(diff);
-                    writeObject(out,key);
-                    if (diff) {
-                        out.writeInt(diffvalue.length);
-                        out.write(diffvalue);
-                    } else {
-                        writeObject(out,value);
-                    } //endif
-                    writeMembers(out,nodes);
-                    break;
-                }
-                case MSG_RETRIEVE_BACKUP: {
-                    writeObject(out,key);
-                    writeObject(out,value);
-                    break;
-                }
-                case MSG_REMOVE: {
-                    writeObject(out,key);
-                    break;
-                }
-                case MSG_PROXY: {
-                    writeObject(out,key);
-                    writeMembers(out,nodes);
-                    break;
-                }
-                case MSG_START:
-                case MSG_STOP: {
-                    writeMembers(out,nodes);
-                    break;
-                }
-            } //switch
-        } //writeExternal
         
         /**
          * shallow clone
@@ -927,12 +839,20 @@
         return externalLoaders;
     }
 
+    public int getChannelSendOptions() {
+        return channelSendOptions;
+    }
+
     public void setMapOwner(Object mapOwner) {
         this.mapOwner = mapOwner;
     }
 
     public void setExternalLoaders(ClassLoader[] externalLoaders) {
         this.externalLoaders = externalLoaders;
+    }
+
+    public void setChannelSendOptions(int channelSendOptions) {
+        this.channelSendOptions = channelSendOptions;
     }
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Wed Mar 22 19:59:38 2006
@@ -84,8 +84,8 @@
          * @param initialCapacity int - the size of this map, see HashMap
          * @param loadFactor float - load factor, see HashMap
          */
-        public LazyReplicatedMap(Object owner, Channel channel, long timeout, 
String mapContextName, int initialCapacity, float loadFactor) {
-            
super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, 
Channel.SEND_OPTIONS_DEFAULT);
+        public LazyReplicatedMap(Object owner, Channel channel, long timeout, 
String mapContextName, int initialCapacity, float loadFactor, ClassLoader[] 
cls) {
+            
super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, 
Channel.SEND_OPTIONS_DEFAULT,cls);
         }
 
         /**
@@ -95,8 +95,8 @@
          * @param mapContextName String - unique name for this map, to allow 
multiple maps per channel
          * @param initialCapacity int - the size of this map, see HashMap
          */
-        public LazyReplicatedMap(Object owner, Channel channel, long timeout, 
String mapContextName, int initialCapacity) {
-            super(owner, channel,timeout,mapContextName,initialCapacity, 
LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
+        public LazyReplicatedMap(Object owner, Channel channel, long timeout, 
String mapContextName, int initialCapacity, ClassLoader[] cls) {
+            super(owner, channel,timeout,mapContextName,initialCapacity, 
LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
         }
 
         /**
@@ -105,8 +105,8 @@
          * @param timeout long - timeout for RPC messags
          * @param mapContextName String - unique name for this map, to allow 
multiple maps per channel
          */
-        public LazyReplicatedMap(Object owner, Channel channel, long timeout, 
String mapContextName) {
-            super(owner, channel,timeout,mapContextName, 
LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
+        public LazyReplicatedMap(Object owner, Channel channel, long timeout, 
String mapContextName, ClassLoader[] cls) {
+            super(owner, channel,timeout,mapContextName, 
LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT,
 cls);
         }
 
 
@@ -166,7 +166,7 @@
                         ReplicatedMapEntry val = 
(ReplicatedMapEntry)entry.getValue();
                         val.setOwner(getMapOwner());
                     }
-                    entry.setValue(msg.getValue());
+                    if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
                 }
                 if (entry.isBackup()) {
                     //select a new backup node
@@ -203,7 +203,6 @@
 
     
     public Object put(Object key, Object value) {
-        System.out.println("Adding session id:"+key);
         if ( !(key instanceof Serializable) ) throw new 
IllegalArgumentException("Key is not serializable:"+key.getClass().getName());
         if ( value == null ) return remove(key);
         if ( !(value instanceof Serializable) ) throw new 
IllegalArgumentException("Value is not 
serializable:"+value.getClass().getName());
@@ -247,8 +246,9 @@
      */
     public Object remove(Object key) {
         MapEntry entry = (MapEntry)super.remove(key);
-        MapMessage msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
+        
         try {
+            MapMessage msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
             getChannel().send(getMapMembers(), 
msg,Channel.SEND_OPTIONS_DEFAULT);
         } catch ( ChannelException x ) {
             log.error("Unable to replicate out data for a 
LazyReplicatedMap.remove operation",x);
@@ -294,6 +294,10 @@
         return super.keySet();
     }
     
+    public int sizeFull() {
+        return super.size();
+    }
+
     public Set entrySet() {
         LinkedHashSet set = new LinkedHashSet(super.size());
         Iterator i = super.entrySet().iterator();
@@ -318,9 +322,6 @@
         return Collections.unmodifiableSet(set);
     }
     
-    public int sizeFull() {
-        return super.size();
-    }
     
     public int size() {
         //todo, implement a counter variable instead
@@ -330,7 +331,7 @@
         while ( i.hasNext() ) {
             Map.Entry e = (Map.Entry)i.next();
             MapEntry entry = (MapEntry)e.getValue();
-            if ( entry.isPrimary() ) counter++;
+            if ( entry.isPrimary() && entry.getValue()!=null ) counter++;
         }
         return counter;
     }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
 Wed Mar 22 19:59:38 2006
@@ -50,6 +50,9 @@
  * @todo implement periodic sync/transfer thread
  * @author Filip Hanik
  * @version 1.0
+ * 
+ * @todo memberDisappeared, should do nothing except change map membership
+ *       by default it relocates the primary objects
  */
 public class ReplicatedMap extends AbstractReplicatedMap implements 
RpcCallback, ChannelListener, MembershipListener {
 
@@ -66,8 +69,8 @@
      * @param initialCapacity int - the size of this map, see HashMap
      * @param loadFactor float - load factor, see HashMap
      */
-    public ReplicatedMap(Object owner, Channel channel, long timeout, String 
mapContextName, int initialCapacity,float loadFactor) {
-        super(owner,channel, timeout, mapContextName, initialCapacity, 
loadFactor, Channel.SEND_OPTIONS_DEFAULT);
+    public ReplicatedMap(Object owner, Channel channel, long timeout, String 
mapContextName, int initialCapacity,float loadFactor, ClassLoader[] cls) {
+        super(owner,channel, timeout, mapContextName, initialCapacity, 
loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
     }
 
     /**
@@ -77,8 +80,8 @@
      * @param mapContextName String - unique name for this map, to allow 
multiple maps per channel
      * @param initialCapacity int - the size of this map, see HashMap
      */
-    public ReplicatedMap(Object owner, Channel channel, long timeout, String 
mapContextName, int initialCapacity) {
-        super(owner,channel, timeout, mapContextName, initialCapacity, 
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
+    public ReplicatedMap(Object owner, Channel channel, long timeout, String 
mapContextName, int initialCapacity, ClassLoader[] cls) {
+        super(owner,channel, timeout, mapContextName, initialCapacity, 
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT, cls);
     }
 
     /**
@@ -87,8 +90,8 @@
      * @param timeout long - timeout for RPC messags
      * @param mapContextName String - unique name for this map, to allow 
multiple maps per channel
      */
-    public ReplicatedMap(Object owner, Channel channel, long timeout, String 
mapContextName) {
-        super(owner, channel, timeout, 
mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, 
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
+    public ReplicatedMap(Object owner, Channel channel, long timeout, String 
mapContextName, ClassLoader[] cls) {
+        super(owner, channel, timeout, 
mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, 
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
     }
 
 
//------------------------------------------------------------------------------

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 Wed Mar 22 19:59:38 2006
@@ -79,6 +79,10 @@
                            long timeout) throws ChannelException {
         
         if ( destination==null || destination.length == 0 ) return new 
Response[0];
+        
+        //avoid dead lock
+        channelOptions = channelOptions & 
~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+        
         RpcCollectorKey key = new 
RpcCollectorKey(UUIDGenerator.randomUUID(false));
         RpcCollector collector = new 
RpcCollector(key,rpcOptions,destination.length,timeout);
         try {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
 Wed Mar 22 19:59:38 2006
@@ -244,7 +244,8 @@
 
     
     
-    public static class LoadMessage extends ByteMessage implements 
Serializable  {
+    //public static class LoadMessage extends ByteMessage implements 
Serializable  {
+    public static class LoadMessage implements Serializable  {
         
         public static byte[] outdata = new byte[size];
         public static Random r = new Random(System.currentTimeMillis());

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
 Wed Mar 22 19:59:38 2006
@@ -45,10 +45,14 @@
     protected SimpleTableDemo table;
     
     public MapDemo(Channel channel ) {
-        map = new LazyReplicatedMap(null,channel,5000, "MapDemo");
+        map = new LazyReplicatedMap(null,channel,5000, "MapDemo",null);
         table = 
SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
         channel.addChannelListener(this);
         channel.addMembershipListener(this);
+        for ( int i=0; i<1000; i++ ) {
+            map.put("MyKey-"+i,"My String Value-"+i);
+        }
+        this.messageReceived(null,null);
     }
     
     public boolean accept(Serializable msg, Member source) {

Modified: 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/BackupManager.java
 Wed Mar 22 19:59:38 2006
@@ -28,6 +28,7 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.ReplicationStream;
 import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+import org.apache.catalina.tribes.Channel;
 
 /**
  [EMAIL PROTECTED] Filip Hanik
@@ -56,7 +57,10 @@
      * Should listeners be notified?
      */
     private boolean notifyListenersOnReplication;
-
+    /**
+     * 
+     */
+    private int mapSendOptions = 
Channel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK;
 
     /**
      * Constructor, just calls super()
@@ -139,7 +143,7 @@
         ClassLoader classLoader = null;
         if (container != null) loader = container.getLoader();
         if (loader != null) classLoader = loader.getClassLoader();
-        else classLoader = Thread.currentThread().getContextClassLoader();
+        if ( classLoader == null ) classLoader = 
Thread.currentThread().getContextClassLoader();
         if ( classLoader == Thread.currentThread().getContextClassLoader() ) {
             return new ClassLoader[] {classLoader};
         } else {
@@ -189,8 +193,9 @@
             LazyReplicatedMap map = new LazyReplicatedMap(this,
                                                           
catclust.getChannel(),
                                                           DEFAULT_REPL_TIMEOUT,
-                                                          getMapName());
-            map.setExternalLoaders(getClassLoaders());
+                                                          getMapName(),
+                                                          getClassLoaders());
+            map.setChannelSendOptions(mapSendOptions);
             this.sessions = map;
             super.start();
         }  catch ( Exception x ) {
@@ -246,6 +251,9 @@
         this.notifyListenersOnReplication = notifyListenersOnReplication;
     }
 
+    public void setMapSendOptions(int mapSendOptions) {
+        this.mapSendOptions = mapSendOptions;
+    }
 
     /* 
      * @see org.apache.catalina.ha.ClusterManager#getCluster()
@@ -253,7 +261,11 @@
     public CatalinaCluster getCluster() {
         return cluster;
     }
-    
+
+    public int getMapSendOptions() {
+        return mapSendOptions;
+    }
+
     public String[] getInvalidatedSessions() {
         return new String[0];
     }

Modified: 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java?rev=388020&r1=388019&r2=388020&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/ha/session/DeltaRequest.java
 Wed Mar 22 19:59:38 2006
@@ -159,16 +159,22 @@
             switch ( info.getType() ) {
                 case TYPE_ATTRIBUTE: {
                     if ( info.getAction() == ACTION_SET ) {
+                        if ( log.isTraceEnabled() ) 
log.trace("Session.setAttribute('"+info.getName()+"', '"+info.getValue()+"')");
                         session.setAttribute(info.getName(), 
info.getValue(),notifyListeners,false);
-                    }  else
+                    }  else {
+                        if ( log.isTraceEnabled() ) 
log.trace("Session.removeAttribute('"+info.getName()+"')");
                         
session.removeAttribute(info.getName(),notifyListeners,false);
+                    }
+                        
                     break;
                 }//case
                 case TYPE_ISNEW: {
+                if ( log.isTraceEnabled() ) 
log.trace("Session.setNew('"+info.getValue()+"')");
                     
session.setNew(((Boolean)info.getValue()).booleanValue(),false);
                     break;
                 }//case
                 case TYPE_MAXINTERVAL: {
+                    if ( log.isTraceEnabled() ) 
log.trace("Session.setMaxInactiveInterval('"+info.getValue()+"')");
                     
session.setMaxInactiveInterval(((Integer)info.getValue()).intValue(),false);
                     break;
                 }//case
@@ -341,28 +347,30 @@
             return other.getName().equals(this.getName());
         }
         
-        public synchronized void readExternal(java.io.ObjectInput in ) throws 
java.io.IOException,
-            java.lang.ClassNotFoundException {
+        public synchronized void readExternal(java.io.ObjectInput in ) throws 
IOException,ClassNotFoundException {
             //type - int
             //action - int
             //name - String
+            //hasvalue - boolean
             //value - object
             type = in.readInt();
             action = in.readInt();
             name = in.readUTF();
-            value = in.readObject();
+            boolean hasValue = in.readBoolean();
+            if ( hasValue ) value = in.readObject();
         }
 
-        public synchronized void writeExternal(java.io.ObjectOutput out) 
throws java.io.
-            IOException {
+        public synchronized void writeExternal(java.io.ObjectOutput out) 
throws IOException {
             //type - int
             //action - int
             //name - String
+            //hasvalue - boolean
             //value - object
             out.writeInt(getType());
             out.writeInt(getAction());
             out.writeUTF(getName());
-            out.writeObject(getValue());
+            out.writeBoolean(getValue()!=null);
+            if (getValue()!=null) out.writeObject(getValue());
         }
         
         public String toString() {



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to