Author: fhanik
Date: Fri Mar 10 09:24:53 2006
New Revision: 384858

URL: http://svn.apache.org/viewcvs?rev=384858&view=rev
Log:
the map implementation is complete and ready to be tested

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384858&r1=384857&r2=384858&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Fri Mar 10 09:24:53 2006
@@ -15,31 +15,30 @@
  */
 package org.apache.catalina.tribes.tipis;
 
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.catalina.tribes.Channel;
-import java.io.Serializable;
-import org.apache.catalina.tribes.Member;
-import java.io.UnsupportedEncodingException;
+import java.io.Externalizable;
 import java.io.IOException;
-import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import java.util.Set;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelListener;
-import java.util.Collection;
+import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
-import java.io.Externalizable;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.mcast.McastMember;
-import java.util.Iterator;
-import org.apache.catalina.tribes.ChannelException;
-import java.util.LinkedList;
-import java.util.LinkedHashSet;
-import java.util.ArrayList;
-import java.util.Arrays;
 
 /**
  * @todo implement periodic sync/transfer 
@@ -55,9 +54,10 @@
 //              INSTANCE VARIABLES
 
//------------------------------------------------------------------------------
   
 
-    private Channel channel;
-    private RpcChannel rpcChannel;
-    private byte[] mapContextName;
+    private transient Channel channel;
+    private transient RpcChannel rpcChannel;
+    private transient byte[] mapContextName;
+    private transient boolean stateTransferred = false;
     
     
 
//------------------------------------------------------------------------------
    
@@ -109,13 +109,36 @@
         this.rpcChannel = null;
         this.channel = null;
         super.clear();
+        this.stateTransferred = false;
     }
     
 
//------------------------------------------------------------------------------
    
 //              GROUP COM INTERFACES
 
//------------------------------------------------------------------------------
   
     public void transferState() {
-        throw new UnsupportedOperationException();
+        try {
+            Member backup = 
channel.getMembers().length>0?channel.getMembers()[0]:null;
+            if ( backup != null ) {
+                MapMessage msg = new 
MapMessage(mapContextName,MapMessage.MSG_STATE,false,
+                                                null,null,null,null);
+                Response[] resp = rpcChannel.send(new Member[] 
{backup},msg,rpcChannel.FIRST_REPLY,TIME_OUT);
+                if ( resp.length > 0 ) {
+                    msg = (MapMessage)resp[0].getMessage();
+                    ArrayList list = (ArrayList)msg.getValue();
+                    for (int i=0; i<list.size(); i++ ) {
+                        MapMessage m = (MapMessage)list.get(i);
+                        MapEntry entry = new MapEntry(m.getKey(),m.getValue());
+                        entry.setBackup(false);
+                        entry.setProxy(true);
+                        entry.setBackupNode(m.getBackupNode());
+                        super.put(entry.getKey(),entry);
+                    }
+                }
+            }
+        } catch ( ChannelException x ) {
+            log.error("Unable to transfer LazyReplicatedMap state.",x);
+        }
+        stateTransferred = true;
     }
     
     /**
@@ -126,12 +149,33 @@
     public Serializable replyRequest(Serializable msg, Member sender) {
         if ( !(msg instanceof MapMessage) ) return null;
         MapMessage mapmsg = (MapMessage)msg;
-        if ( mapmsg.getMsgType() != mapmsg.MSG_RETRIEVE_BACKUP ) return null;
         
-        MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
-        if ( entry == null ) return null;
-        mapmsg.setValue((Serializable)entry.getValue());
-        return mapmsg;
+        //backup request
+        if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) {
+            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+            if (entry == null)return null;
+            mapmsg.setValue( (Serializable) entry.getValue());
+            return mapmsg;
+        }
+        
+        //state transfer request
+        if ( mapmsg.getMsgType() == mapmsg.MSG_STATE ) {
+            ArrayList list = new ArrayList();
+            Iterator i = super.entrySet().iterator();
+            while (i.hasNext()) {
+                Map.Entry e = (Map.Entry) i.next();
+                MapEntry entry = (MapEntry) e.getValue();
+                MapMessage me = new 
MapMessage(mapContextName,MapMessage.MSG_PROXY,
+                                               
false,(Serializable)entry.getKey(),(Serializable)entry.getValue(),
+                                               null,entry.getBackupNode());
+                list.add(me);
+            }
+            mapmsg.setValue(list);
+            return mapmsg;
+        }
+        
+        return null;
+
     }
 
     /**
@@ -145,8 +189,45 @@
     }
 
     public void messageReceived(Serializable msg, Member sender) {
-        throw new UnsupportedOperationException();
         //todo implement all the messages that we can receive
+        //messages we can receive are MSG_PROXY, MSG_BACKUP
+        if ( !(msg instanceof MapMessage) ) return;
+
+        MapMessage mapmsg = (MapMessage)msg;
+        if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) {
+            MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
+            entry.setBackup(false);
+            entry.setProxy(true);
+            entry.setBackupNode(mapmsg.getBackupNode());
+            super.put(entry.getKey(),entry);
+        }
+        
+        if ( mapmsg.getMsgType() == MapMessage.MSG_BACKUP ) {
+            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+            if ( entry == null ) {
+                entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
+                entry.setBackup(true);
+                entry.setProxy(false);
+                entry.setBackupNode(mapmsg.getBackupNode());
+                super.put(entry.getKey(), entry);
+            } else {
+                if ( mapmsg.isDiff() ) {
+                    if ( entry.getValue() instanceof Diffable ) {
+                        Diffable diff = (Diffable)entry.getValue();
+                        try {
+                            diff.applyDiff(mapmsg.getDiffValue(), 0, 
mapmsg.getDiffValue().length);
+                        }catch ( IOException x ) {
+                            log.error("Unable to apply diff to 
key:"+entry.getKey(),x);
+                        }
+                    } else {
+                        log.warn("Received a DIFF replication, but 
object["+entry.getValue()+"] does not implement Diffable");
+                    }
+                } else {
+                    entry.setValue(mapmsg.getValue());
+                }
+            }
+        }
+        
     }
     
     public boolean accept(Serializable msg, Member sender) {
@@ -304,7 +385,17 @@
     }
 
     public boolean containsValue(Object value) {
-        return super.containsValue(value);
+        if ( value == null ) {
+            return super.containsValue(value);
+        } else {
+            Iterator i = super.entrySet().iterator();
+            while (i.hasNext()) {
+                Map.Entry e = (Map.Entry) i.next();
+                MapEntry entry = (MapEntry) e.getValue();
+                if (entry.isPrimary() && value.equals(entry.getValue())) 
return true;
+            }//while
+            return false;
+        }//end if
     }
 
     public Object clone() {
@@ -440,11 +531,11 @@
         }
         
         public int hashCode() {
-            return key.hashCode();
+            return value.hashCode();
         }
         
         public boolean equals(Object o) {
-            return key.equals(o);
+            return value.equals(o);
         }
         
         /**
@@ -486,6 +577,7 @@
         public static final int MSG_RETRIEVE_BACKUP = 2;
         public static final int MSG_PROXY = 3;
         public static final int MSG_REMOVE = 4;
+        public static final int MSG_STATE = 5;
         
         private byte[] mapId;
         private int msgtype;
@@ -545,7 +637,8 @@
             in.read(mapId);
             msgtype = in.readInt();
             switch (msgtype) {
-                case MSG_BACKUP: {
+                case MSG_BACKUP:
+                case MSG_STATE: {
                     diff = in.readBoolean();
                     key = (Serializable)in.readObject();
                     if ( diff ) {
@@ -576,7 +669,8 @@
             out.write(mapId);
             out.writeInt(msgtype);
             switch (msgtype) {
-                case MSG_BACKUP: {
+                case MSG_BACKUP:
+                case MSG_STATE: {
                     out.writeBoolean(diff);
                     out.writeObject(key);
                     if ( diff ) {
@@ -601,6 +695,10 @@
                 }
             }//switch
         }//writeExternal
+        
+        public Object clone() {
+            return new 
MapMessage(this.mapId,this.msgtype,this.diff,this.key,this.value,this.diffvalue,this.node);
+        }
     }//MapMessage
 
 
//------------------------------------------------------------------------------
    



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to