Author: fhanik
Date: Thu Mar  9 18:04:49 2006
New Revision: 384676

URL: http://svn.apache.org/viewcvs?rev=384676&view=rev
Log:
Working on the replicated map

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384676&r1=384675&r2=384676&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Thu Mar  9 18:04:49 2006
@@ -30,14 +30,26 @@
 import org.apache.catalina.tribes.ChannelListener;
 import java.util.Collection;
 import org.apache.catalina.tribes.MembershipListener;
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.catalina.tribes.mcast.McastMember;
+import java.util.Iterator;
+import org.apache.catalina.tribes.ChannelException;
+import java.util.LinkedList;
+import java.util.LinkedHashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
+ * @todo implement periodic sync/transfer 
  * @author Filip Hanik
  * @version 1.0
  */
 public class LazyReplicatedMap extends LinkedHashMap 
     implements RpcCallback, ChannelListener, MembershipListener {
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class);
+    protected static long TIME_OUT = 15000;//hard coded timeout
     
 
//------------------------------------------------------------------------------
    
 //              INSTANCE VARIABLES
@@ -45,6 +57,7 @@
 
     private Channel channel;
     private RpcChannel rpcChannel;
+    private byte[] mapContextName;
     
     
 
//------------------------------------------------------------------------------
    
@@ -69,13 +82,15 @@
         final String chset = "ISO-8859-1";
         this.channel = channel;
         try {
-            this.rpcChannel = new RpcChannel(mapContextName.getBytes(chset), 
channel, this);
+            this.mapContextName = mapContextName.getBytes(chset);
         }catch (UnsupportedEncodingException x) {
             log.warn("Unable to encode mapContextName["+mapContextName+"] 
using getBytes("+chset+") using default getBytes()",x);
-            this.rpcChannel = new RpcChannel(mapContextName.getBytes(), 
channel, this);
+            this.mapContextName = mapContextName.getBytes();
         }
+        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
         this.channel.addChannelListener(this);
         this.channel.addMembershipListener(this);
+        transferState();
         
     }
     
@@ -93,18 +108,30 @@
         }
         this.rpcChannel = null;
         this.channel = null;
+        super.clear();
     }
     
 
//------------------------------------------------------------------------------
    
 //              GROUP COM INTERFACES
 
//------------------------------------------------------------------------------
   
+    public void transferState() {
+        throw new UnsupportedOperationException();
+    }
+    
     /**
-     * 
+     * @todo implement state transfer
      * @param msg Serializable
      * @return Serializable - null if no reply should be sent
      */
     public Serializable replyRequest(Serializable msg, Member sender) {
-        throw new UnsupportedOperationException();
+        if ( !(msg instanceof MapMessage) ) return null;
+        MapMessage mapmsg = (MapMessage)msg;
+        if ( mapmsg.getMsgType() != mapmsg.MSG_RETRIEVE_BACKUP ) return null;
+        
+        MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+        if ( entry == null ) return null;
+        mapmsg.setValue((Serializable)entry.getValue());
+        return mapmsg;
     }
 
     /**
@@ -114,30 +141,112 @@
      * @param sender Member
      */
     public void leftOver(Serializable msg, Member sender) {
-        throw new UnsupportedOperationException();
+        //ignore left over responses
     }
 
     public void messageReceived(Serializable msg, Member sender) {
         throw new UnsupportedOperationException();
+        //todo implement all the messages that we can receive
     }
     
     public boolean accept(Serializable msg, Member sender) {
-        throw new UnsupportedOperationException();
+        if ( msg instanceof MapMessage ) {
+            return Arrays.equals(mapContextName,((MapMessage)msg).getMapId());
+        }
+        return false;
     }
     
     public void memberAdded(Member member) {
-        
+        //do nothing, we don't care
     }
     public void memberDisappeared(Member member) {
-        
+        //todo move all sessions that are primary here to and have this member 
as 
+        //a backup
+        Iterator i = super.entrySet().iterator();
+        while ( i.hasNext() ) {
+            Map.Entry e = (Map.Entry)i.next();
+            MapEntry entry = (MapEntry)e.getValue();
+            if ( entry.isPrimary() && member.equals(entry.getBackupNode())) {
+                try {
+                    Member backup = publishEntryInfo(entry.getKey(), 
entry.getValue());
+                    entry.setBackupNode(backup);
+                }catch ( ChannelException x ) {
+                    log.error("Unable to relocate["+entry.getKey()+"] to a new 
backup node",x);
+                }
+            }//end if
+        }//while
+    }
+    
+    int currentNode = 0;
+    public Member getNextBackupNode() {
+        Member[] members = channel.getMembers();
+        if ( members.length == 0 ) return null;
+        int node = currentNode++;
+        if ( node >= members.length ) {
+            node = 0;
+            currentNode = 0;
+        }
+        return members[node];
     }
     
+    
+    
 
//------------------------------------------------------------------------------
    
 //              METHODS TO OVERRIDE    
 
//------------------------------------------------------------------------------
-
+    /**
+     * publish info about a map pair (key/value) to other nodes in the cluster
+     * @param key Object
+     * @param value Object
+     * @return Member
+     * @throws ChannelException
+     */
+    protected Member publishEntryInfo(Object key, Object value) throws 
ChannelException {
+        //select a backup node
+        Member backup = getNextBackupNode();
+        //publish the data out to all nodes
+        MapMessage msg = new MapMessage(this.mapContextName, 
MapMessage.MSG_PROXY, false,
+                                        (Serializable) key, null, null, 
backup);
+        channel.send(channel.getMembers(), msg);
+
+        //publish the backup data to one node
+        msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false,
+                             (Serializable) key, (Serializable) value, null, 
backup);
+        channel.send(new Member[] {backup}, msg);
+        return backup;
+    }
+    
     public Object get(Object key) {
-        return super.get(key);
+        MapEntry entry = (MapEntry)super.get(key);
+        if ( entry == null ) return null;
+        if ( !entry.isPrimary() ) {
+            try {
+                MapMessage msg = new MapMessage(mapContextName, 
MapMessage.MSG_RETRIEVE_BACKUP, false,
+                                                (Serializable) key, null, 
null, null);
+                Response[] resp = rpcChannel.send(new Member[] 
{entry.getBackupNode()},
+                                                  msg, 
this.rpcChannel.FIRST_REPLY, TIME_OUT);
+                if (resp == null || resp.length == 0) {
+                    //no responses
+                    log.warn("Unable to retrieve object for key:" + key);
+                    return null;
+                }
+                msg = (MapMessage) resp[0].getMessage();
+                
+                Member backup = entry.getBackupNode();
+                if (entry.isBackup()) {
+                    //select a new backup node
+                    backup = publishEntryInfo(key, msg.getValue());
+                }
+                entry.setBackupNode(backup);
+                entry.setBackup(false);
+                entry.setProxy(false);
+                entry.setValue(msg.getValue());
+            } catch (ChannelException x) {
+                log.error("Unable to replicate out data for a 
LazyReplicatedMap.get operation", x);
+                return null;
+            }
+        }
+        return entry.getValue();
     }
 
     public boolean containsKey(Object key) {
@@ -145,19 +254,53 @@
     }
 
     public Object put(Object key, Object value) {
-        return super.put(key,value);
+        if ( !(key instanceof Serializable) ) throw new 
IllegalArgumentException("Key is not serializable:"+key.getClass().getName());
+        if ( value == null ) return remove(key);
+        if ( !(value instanceof Serializable) ) throw new 
IllegalArgumentException("Value is not 
serializable:"+value.getClass().getName());
+
+        MapEntry entry = new MapEntry((Serializable)key,(Serializable)value);
+        entry.setBackup(false);
+        entry.setProxy(false);
+        
+        Object old = null;
+        
+        //make sure that any old values get removed
+        if ( containsKey(key) ) old = (MapEntry)remove(key);
+        try {
+            Member backup = publishEntryInfo(key, value);
+            entry.setBackupNode(backup);
+        } catch (ChannelException x) {
+            log.error("Unable to replicate out data for a 
LazyReplicatedMap.put operation", x);
+        }
+        super.put(key,entry);
+        return old;
     }
 
+    
+
     public void putAll(Map m) {
-        super.putAll(m);
+        Iterator i = m.entrySet().iterator();
+        while ( i.hasNext() ) {
+            Map.Entry entry = (Map.Entry)i.next();
+            put(entry.getKey(),entry.getValue());
+        }
     }
 
     public Object remove(Object key) {
-        return super.remove(key);
+        MapEntry entry = (MapEntry)super.remove(key);
+        MapMessage msg = new 
MapMessage(mapContextName,MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
+        try {
+            channel.send(channel.getMembers(), msg);
+        } catch ( ChannelException x ) {
+            log.error("Unable to replicate out data for a 
LazyReplicatedMap.remove operation",x);
+        }
+        return entry!=null?entry.getValue():null;
     }
 
     public void clear() {
-        super.clear();
+        //only delete active keys
+        Iterator keys = keySet().iterator();
+        while ( keys.hasNext() ) remove(keys.next());
     }
 
     public boolean containsValue(Object value) {
@@ -165,19 +308,44 @@
     }
 
     public Object clone() {
-        return super.clone();
+        throw new UnsupportedOperationException("This operation is not valid 
on a replicated map");
     }
     
     public Set entrySet() {
-        return super.entrySet();
+        LinkedHashSet set = new LinkedHashSet(super.size());
+        Iterator i = super.entrySet().iterator();
+        while ( i.hasNext() ) {
+            Map.Entry e = (Map.Entry)i.next();
+            MapEntry entry = (MapEntry)e.getValue();
+            if ( entry.isPrimary() ) set.add(entry.getValue());
+        }
+        return set;
     }
     
     public Set keySet() {
-        return super.keySet();
+        //todo implement
+        //should only return keys where this is active.
+        LinkedHashSet set = new LinkedHashSet(super.size());
+        Iterator i = super.entrySet().iterator();
+        while ( i.hasNext() ) {
+            Map.Entry e = (Map.Entry)i.next();
+            MapEntry entry = (MapEntry)e.getValue();
+            if ( entry.isPrimary() ) set.add(entry.getKey());
+        }
+        return set;
     }
     
     public int size() {
-        return super.size();
+        //todo, implement a counter variable instead
+        //only count active members in this node
+        int counter = 0;
+        Iterator i = super.entrySet().iterator();
+        while ( i.hasNext() ) {
+            Map.Entry e = (Map.Entry)i.next();
+            MapEntry entry = (MapEntry)e.getValue();
+            if ( entry.isPrimary() ) counter++;
+        }
+        return counter;
     }
     
     protected boolean removeEldestEntry(Map.Entry eldest) {
@@ -185,11 +353,18 @@
     }
     
     public boolean isEmpty() {
-        return super.isEmpty();
+        return size()==0;
     }
     
     public Collection values() {
-        return super.values();
+        ArrayList values = new ArrayList(super.size());
+        Iterator i = super.entrySet().iterator();
+        while ( i.hasNext() ) {
+            Map.Entry e = (Map.Entry)i.next();
+            MapEntry entry = (MapEntry)e.getValue();
+            if ( entry.isPrimary() ) values.add(entry.getValue());
+        }
+        return values;
     }
     
 
@@ -220,6 +395,10 @@
         public boolean isProxy() {
             return proxy;
         }
+        
+        public boolean isPrimary() {
+            return ((!proxy) && (!backup));
+        }
 
         public void setProxy(boolean proxy) {
             this.proxy = proxy;
@@ -237,8 +416,6 @@
             return backupNode;
         }
         
-        
-        
         public Object getValue() {
             return value;
         }
@@ -300,6 +477,131 @@
         }
         
     }
+//------------------------------------------------------------------------------
    
+//                map message to send to and from other maps    
+//------------------------------------------------------------------------------
+    
+    public static class MapMessage implements Externalizable {
+        public static final int MSG_BACKUP = 1;
+        public static final int MSG_RETRIEVE_BACKUP = 2;
+        public static final int MSG_PROXY = 3;
+        public static final int MSG_REMOVE = 4;
+        
+        private byte[] mapId;
+        private int msgtype;
+        private boolean diff;
+        private Serializable key;
+        private Serializable value;
+        private byte[] diffvalue;
+        private Member node;
+        
+        public MapMessage(byte[] mapId,
+                          int msgtype, boolean diff, 
+                          Serializable key,Serializable value,
+                          byte[] diffvalue, Member node) {
+            this.mapId = mapId;
+            this.msgtype = msgtype;
+            this.diff = diff;
+            this.key = key;
+            this.value = value;
+            this.diffvalue = diffvalue;
+            this.node = node;
+        }
+        
+        public int getMsgType() {
+            return msgtype;
+        }
+        
+        public boolean isDiff() {
+            return diff;
+        }
+        
+        public Serializable getKey() {
+            return key;
+        }
+        
+        public Serializable getValue() {
+            return value;
+        }
+        
+        public byte[] getDiffValue() {
+            return diffvalue;
+        }
+        
+        public Member getBackupNode() {
+            return node;
+        }
+        
+        public byte[] getMapId() {
+            return mapId;
+        }
+        
+        public void setValue(Serializable value) {
+            this.value = value;
+        }
+        
+        public void readExternal(ObjectInput in) throws 
IOException,ClassNotFoundException {
+            mapId = new byte[in.readInt()];
+            in.read(mapId);
+            msgtype = in.readInt();
+            switch (msgtype) {
+                case MSG_BACKUP: {
+                    diff = in.readBoolean();
+                    key = (Serializable)in.readObject();
+                    if ( diff ) {
+                        diffvalue = new byte[in.readInt()];
+                        in.read(diffvalue);
+                    } else {
+                        value = (Serializable)in.readObject();
+                    }//endif
+                    break;
+                }
+                case MSG_RETRIEVE_BACKUP:
+                case MSG_REMOVE : {
+                    key = (Serializable)in.readObject();
+                    break;
+                }
+                case MSG_PROXY: {
+                    key = (Serializable)in.readObject();
+                    byte[] d = new byte[in.readInt()];
+                    in.read(d);
+                    node = McastMember.getMember(d);
+                    break;
+                }
+            }//switch
+        }//readExternal
+
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(mapId.length);
+            out.write(mapId);
+            out.writeInt(msgtype);
+            switch (msgtype) {
+                case MSG_BACKUP: {
+                    out.writeBoolean(diff);
+                    out.writeObject(key);
+                    if ( diff ) {
+                        out.writeInt(diffvalue.length);
+                        out.write(diffvalue);
+                    } else {
+                        out.writeObject(value);
+                    }//endif
+                    break;
+                }
+                case MSG_RETRIEVE_BACKUP:
+                case MSG_REMOVE : {
+                    out.writeObject(key);
+                    break;
+                }
+                case MSG_PROXY: {
+                    out.writeObject(key);
+                    byte[] d = ((McastMember)node).getData(false);
+                    out.writeInt(d.length);
+                    out.write(d);
+                    break;
+                }
+            }//switch
+        }//writeExternal
+    }//MapMessage
 
 
//------------------------------------------------------------------------------
    
 //                streamable class    

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384676&r1=384675&r2=384676&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 Thu Mar  9 18:04:49 2006
@@ -74,7 +74,7 @@
     public Response[] send(Member[] destination, 
                            Serializable message,
                            int options, 
-                           long timeout) throws ChannelException, 
InterruptedException {
+                           long timeout) throws ChannelException {
         
         if ( destination==null || destination.length == 0 ) return new 
Response[0];
         RpcCollectorKey key = new 
RpcCollectorKey(UUIDGenerator.randomUUID(false));
@@ -86,6 +86,8 @@
                 channel.send(destination, rmsg);
                 collector.wait(timeout);
             }
+        } catch ( InterruptedException ix ) {
+            throw new ChannelException(ix);
         }finally {
             responseMap.remove(key);
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=384676&r1=384675&r2=384676&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Mar  9 18:04:49 2006
@@ -54,6 +54,12 @@
 
 15. remove DataSenderFactory and DataSender.properties -
     these cause the settings to be hard coded ant not pluggable.
+    
+16. Guaranteed delivery of messages, ie either all get it or none get it.
+    Meaning, that all receivers get it, then wait for a process command.
+
+17. Implement transactions - the ability to start a transaction, send several 
messages,
+                             and then commit the transaction
 
 Tasks Completed
 ===========================================



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to