Author: fhanik
Date: Sun Mar 12 07:39:15 2006
New Revision: 385300

URL: http://svn.apache.org/viewcvs?rev=385300&view=rev
Log:
The replicated map implements membership logic on a per map basis. so that you 
can have multiple maps in a cluster.

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=385300&r1=385299&r2=385300&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Sun Mar 12 07:39:15 2006
@@ -89,6 +89,7 @@
     private transient byte[] mapContextName;
     private transient boolean stateTransferred = false;
     private transient Object stateMutex = new Object();
+    private transient ArrayList mapMembers = new ArrayList();
     
     
 
//------------------------------------------------------------------------------
    
@@ -122,6 +123,19 @@
         this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
         this.channel.addChannelListener(this);
         this.channel.addMembershipListener(this);
+
+        try {
+            MapMessage msg = new 
MapMessage(this.mapContextName,MapMessage.MSG_START,
+                                            
false,null,null,null,channel.getLocalMember());
+            Response[] resp = 
rpcChannel.send(channel.getMembers(),msg,rpcChannel.FIRST_REPLY,timeout);
+            for ( int i=0; i<resp.length; i++ ) {
+                messageReceived(resp[i].getMessage(),resp[i].getSource());
+            }
+        }catch ( ChannelException x ) {
+            log.warn("Unable to send stop message.");
+        }
+
+
         transferState();
     }
     
@@ -130,6 +144,14 @@
     }
     
     public void finalize() {
+        try {
+            MapMessage msg = new 
MapMessage(this.mapContextName,MapMessage.MSG_STOP,
+                                            
false,null,null,null,channel.getLocalMember());
+            if ( channel!=null) channel.send(channel.getMembers(),msg);
+        }catch ( ChannelException x ) {
+            log.warn("Unable to send stop message.",x);
+        }
+        
         if ( this.rpcChannel!=null ) {
             this.rpcChannel.breakDown();
         }
@@ -139,6 +161,7 @@
         }
         this.rpcChannel = null;
         this.channel = null;
+        this.mapMembers.clear();
         super.clear();
         this.stateTransferred = false;
     }
@@ -205,7 +228,7 @@
 
//------------------------------------------------------------------------------
   
     public void transferState() {
         try {
-            Member backup = 
channel.getMembers().length>0?channel.getMembers()[0]:null;
+            Member backup = mapMembers.size()>0?(Member)mapMembers.get(0):null;
             if ( backup != null ) {
                 MapMessage msg = new 
MapMessage(mapContextName,MapMessage.MSG_STATE,false,
                                                 null,null,null,null);
@@ -244,6 +267,13 @@
         if ( !(msg instanceof MapMessage) ) return null;
         MapMessage mapmsg = (MapMessage)msg;
         
+        //map start request
+        if ( mapmsg.getMsgType() == mapmsg.MSG_START ) {
+            mapMemberAdded(sender);
+            mapmsg.setBackUpNode(channel.getLocalMember());
+            return mapmsg;
+        }
+
         //backup request
         if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) {
             MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
@@ -281,7 +311,13 @@
      * @param sender Member
      */
     public void leftOver(Serializable msg, Member sender) {
-        //ignore left over responses
+        //left over membership messages
+        if ( !(msg instanceof MapMessage) ) return;
+
+        MapMessage mapmsg = (MapMessage)msg;
+        if ( mapmsg.getMsgType() == MapMessage.MSG_START ) {
+            mapMemberAdded(mapmsg.getBackupNode());
+        }
     }
 
     public void messageReceived(Serializable msg, Member sender) {
@@ -290,6 +326,15 @@
         if ( !(msg instanceof MapMessage) ) return;
 
         MapMessage mapmsg = (MapMessage)msg;
+
+        if ( mapmsg.getMsgType() == MapMessage.MSG_START ) {
+            mapMemberAdded(mapmsg.getBackupNode());
+        }
+
+        if ( mapmsg.getMsgType() == MapMessage.MSG_STOP ) {
+            memberDisappeared(mapmsg.getBackupNode());
+        }
+
         if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) {
             MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
             entry.setBackup(false);
@@ -341,8 +386,9 @@
         return false;
     }
     
-    public void memberAdded(Member member) {
+    public void mapMemberAdded(Member member) {
         //select a backup node if we don't have one
+        mapMembers.add(member);
         synchronized (stateMutex) {
             Iterator i = super.entrySet().iterator();
             while (i.hasNext()) {
@@ -360,7 +406,12 @@
         }//synchronized
         
     }
+    public void memberAdded(Member member) {
+        //do nothing
+    }
+
     public void memberDisappeared(Member member) {
+        mapMembers.remove(member);
         //todo move all sessions that are primary here to and have this member 
as 
         //a backup
         Iterator i = super.entrySet().iterator();
@@ -380,14 +431,13 @@
     
     int currentNode = 0;
     public Member getNextBackupNode() {
-        Member[] members = channel.getMembers();
-        if ( members.length == 0 ) return null;
+        if ( mapMembers.size() == 0 ) return null;
         int node = currentNode++;
-        if ( node >= members.length ) {
+        if ( node >= mapMembers.size() ) {
             node = 0;
             currentNode = 0;
         }
-        return members[node];
+        return (Member)mapMembers.get(node);
     }
     
     
@@ -411,7 +461,7 @@
         //publish the data out to all nodes
         MapMessage msg = new MapMessage(this.mapContextName, 
MapMessage.MSG_PROXY, false,
                                         (Serializable) key, null, null, 
backup);
-        channel.send(channel.getMembers(), msg);
+        channel.send((Member[])mapMembers.toArray(new 
Member[mapMembers.size()]), msg);
 
         //publish the backup data to one node
         msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false,
@@ -469,13 +519,14 @@
         Object old = null;
         
         //make sure that any old values get removed
-        if ( containsKey(key) ) old = (MapEntry)remove(key);
+        if ( containsKey(key) ) old = remove(key);
         try {
             Member backup = publishEntryInfo(key, value);
             entry.setBackupNode(backup);
         } catch (ChannelException x) {
             log.error("Unable to replicate out data for a 
LazyReplicatedMap.put operation", x);
         }
+        System.out.println("adding key="+key+" entry="+entry);
         super.put(key,entry);
         return old;
     }
@@ -719,6 +770,8 @@
         public static final int MSG_PROXY = 3;
         public static final int MSG_REMOVE = 4;
         public static final int MSG_STATE = 5;
+        public static final int MSG_START = 6;
+        public static final int MSG_STOP = 7;
         
         private byte[] mapId;
         private int msgtype;
@@ -767,6 +820,10 @@
             return node;
         }
         
+        private void setBackUpNode(Member node) {
+            this.node = node;
+        }
+        
         public byte[] getMapId() {
             return mapId;
         }
@@ -811,6 +868,14 @@
                     if ( d.length > 0 ) node = McastMember.getMember(d);
                     break;
                 }
+                case MSG_START : 
+                     MSG_STOP  :{
+                     byte[] d = new byte[in.readInt()];
+                     in.read(d);
+                     if ( d.length > 0 ) node = McastMember.getMember(d);
+                     break;
+                }
+
             }//switch
         }//readExternal
 
@@ -845,6 +910,13 @@
                 }
                 case MSG_PROXY: {
                     out.writeObject(key);
+                    byte[] d = 
node!=null?((McastMember)node).getData(false):new byte[0];
+                    out.writeInt(d.length);
+                    out.write(d);
+                    break;
+                }
+                case MSG_START:
+                     MSG_STOP : {
                     byte[] d = 
node!=null?((McastMember)node).getData(false):new byte[0];
                     out.writeInt(d.length);
                     out.write(d);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=385300&r1=385299&r2=385300&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 Sun Mar 12 07:39:15 2006
@@ -97,6 +97,7 @@
     
     public void messageReceived(Serializable msg, Member sender) {
         RpcMessage rmsg = (RpcMessage)msg;
+System.out.println("Received RPC message with message:"+rmsg.message);        
         RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
         if ( rmsg.reply ) {
             RpcCollector collector = (RpcCollector)responseMap.get(key);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=385300&r1=385299&r2=385300&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
 Sun Mar 12 07:39:15 2006
@@ -65,7 +65,6 @@
     }
     
     public void messageReceived(Serializable msg, Member source) {
-        System.out.println("Recieved: "+msg);
         table.dataModel.getValueAt(-1,-1);
     }
     
@@ -135,7 +134,7 @@
 
     public static class SimpleTableDemo
         extends JPanel implements ActionListener{
-        private static int WIDTH = 500;
+        private static int WIDTH = 550;
         
         private LazyReplicatedMap map;
         private boolean DEBUG = false;
@@ -217,7 +216,7 @@
             
             //create a add value button
             JPanel addpanel = new JPanel();
-            addpanel.setPreferredSize(new Dimension(WIDTH,20));
+            addpanel.setPreferredSize(new Dimension(WIDTH,30));
             addpanel.add(createButton("Add","add"));
             addpanel.add(txtAddKey);
             addpanel.add(txtAddValue);
@@ -225,7 +224,7 @@
             
             //create a remove value button
             JPanel removepanel = new JPanel( );
-            removepanel.setPreferredSize(new Dimension(WIDTH,20));
+            removepanel.setPreferredSize(new Dimension(WIDTH,30));
             removepanel.add(createButton("Remove","remove"));
             removepanel.add(txtRemoveKey);
             
@@ -236,7 +235,7 @@
             changepanel.add(createButton("Change","change"));
             changepanel.add(txtChangeKey);
             changepanel.add(txtChangeValue);
-            changepanel.setPreferredSize(new Dimension(WIDTH,20));
+            changepanel.setPreferredSize(new Dimension(WIDTH,30));
 
             add(changepanel);
             
@@ -244,7 +243,7 @@
             JPanel syncpanel = new JPanel( );
             syncpanel.add(createButton("Synchronize","sync"));
             syncpanel.add(createButton("Replicate","replicate"));
-            syncpanel.setPreferredSize(new Dimension(WIDTH,20));
+            syncpanel.setPreferredSize(new Dimension(WIDTH,30));
 
             add(syncpanel);
 
@@ -323,6 +322,7 @@
 
             //Display the window.
             frame.setSize(450,250);
+            newContentPane.setSize(450,300);
             frame.pack();
             frame.setVisible(true);
             return newContentPane;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to