Author: fhanik
Date: Thu Aug  9 16:30:41 2007
New Revision: 564422

URL: http://svn.apache.org/viewvc?view=rev&rev=564422
Log:
backport from trunk
http://issues.apache.org/bugzilla/show_bug.cgi?id=43053

Modified:
    
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 (original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 Thu Aug  9 16:30:41 2007
@@ -239,7 +239,8 @@
                                         null, 
                                         null, 
                                         null, 
-                                        wrap(channel.getLocalMember(false)));
+                                        channel.getLocalMember(false),
+                                        new Member[0]);
         if ( channel.getMembers().length > 0 ) {
             //send a ping, wait for all nodes to reply
             Response[] resp = rpcChannel.send(channel.getMembers(), 
@@ -287,7 +288,7 @@
     protected void broadcast(int msgtype, boolean rpc) throws ChannelException 
{
         //send out a map membership message, only wait for the first reply
         MapMessage msg = new MapMessage(this.mapContextName, msgtype,
-                                        false, null, null, null, 
wrap(channel.getLocalMember(false)));
+                                        false, null, null, null, 
channel.getLocalMember(false), new Member[0]);
         if ( rpc) {
             Response[] resp = rpcChannel.send(channel.getMembers(), msg, 
rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
             for (int i = 0; i < resp.length; i++) {
@@ -391,6 +392,7 @@
                     msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                          true, (Serializable) entry.getKey(), 
null,
                                          rentry.getDiff(),
+                                         entry.getPrimary(),
                                          entry.getBackupNodes());
                 } catch (IOException x) {
                     log.error("Unable to diff object. Will replicate the 
entire object instead.", x);
@@ -404,7 +406,7 @@
                 msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                      false, (Serializable) entry.getKey(),
                                      (Serializable) entry.getValue(),
-                                     null, entry.getBackupNodes());
+                                     null, 
entry.getPrimary(),entry.getBackupNodes());
 
             }
             try {
@@ -439,7 +441,7 @@
             Member backup = members.length > 0 ? (Member) members[0] : null;
             if (backup != null) {
                 MapMessage msg = new MapMessage(mapContextName, 
getStateMessageType(), false,
-                                                null, null, null, null);
+                                                null, null, null, null, null);
                 Response[] resp = rpcChannel.send(new Member[] {backup}, msg, 
rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
                 if (resp.length > 0) {
                     synchronized (stateMutex) {
@@ -506,7 +508,7 @@
                         boolean copy = (mapmsg.getMsgType() == 
mapmsg.MSG_STATE_COPY);
                         MapMessage me = new MapMessage(mapContextName, 
                                                        
copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
-                            false, (Serializable) entry.getKey(), 
copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
+                            false, (Serializable) entry.getKey(), 
copy?(Serializable) entry.getValue():null, null, 
entry.getPrimary(),entry.getBackupNodes());
                         list.add(me);
                     }
                 }
@@ -534,9 +536,9 @@
         try {
             mapmsg.deserialize(getExternalLoaders());
             if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-                mapMemberAdded(mapmsg.getBackupNodes()[0]);
+                mapMemberAdded(mapmsg.getPrimary());
             } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
-                memberAlive(mapmsg.getBackupNodes()[0]);
+                memberAlive(mapmsg.getPrimary());
             }
         } catch (IOException x ) {
             log.error("Unable to deserialize MapMessage.",x);
@@ -565,11 +567,11 @@
         if ( log.isTraceEnabled() ) 
             log.trace("Map message received from:"+sender.getName()+" 
msg:"+mapmsg);
         if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-            mapMemberAdded(mapmsg.getBackupNodes()[0]);
+            mapMemberAdded(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
-            memberDisappeared(mapmsg.getBackupNodes()[0]);
+            memberDisappeared(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
@@ -579,11 +581,13 @@
                 entry.setBackup(false);
                 entry.setProxy(true);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 super.put(entry.getKey(), entry);
             } else {
                 entry.setProxy(true);
                 entry.setBackup(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
             }
         }
 
@@ -598,6 +602,7 @@
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof 
ReplicatedMapEntry ) {
                     
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                 }
@@ -605,6 +610,7 @@
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 if (entry.getValue() instanceof ReplicatedMapEntry) {
                     ReplicatedMapEntry diff = (ReplicatedMapEntry) 
entry.getValue();
                     if (mapmsg.isDiff()) {
@@ -663,6 +669,7 @@
                         try {
                             Member[] backup = publishEntryInfo(entry.getKey(), 
entry.getValue());
                             entry.setBackupNodes(backup);
+                            entry.setPrimary(channel.getLocalMember(false));
                         } catch (ChannelException x) {
                             log.error("Unable to select backup node.", x);
                         } //catch
@@ -700,6 +707,7 @@
         synchronized (mapMembers) {
             removed = (mapMembers.remove(member) != null );
         }
+        
         Iterator i = super.entrySet().iterator();
         while (i.hasNext()) {
             Map.Entry e = (Map.Entry) i.next();
@@ -708,10 +716,36 @@
                 try {
                     Member[] backup = publishEntryInfo(entry.getKey(), 
entry.getValue());
                     entry.setBackupNodes(backup);
+                    entry.setPrimary(channel.getLocalMember(false));
                 } catch (ChannelException x) {
                     log.error("Unable to relocate[" + entry.getKey() + "] to a 
new backup node", x);
                 }
+            } else if (member.equals(entry.getPrimary())) {
+                entry.setPrimary(null);
             } //end if
+            
+            if ( entry.isProxy() &&
+                 entry.getPrimary() == null && 
+                 entry.getBackupNodes()!=null && 
+                 entry.getBackupNodes().length == 1 &&
+                 entry.getBackupNodes()[0].equals(member) ) {
+                //remove proxies that have no backup nor primaries
+                i.remove();
+            } else if ( entry.isBackup() &&
+                        entry.getBackupNodes()!=null && 
+                        entry.getBackupNodes().length == 1 &&
+                        
entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+                try {
+                    entry.setPrimary(channel.getLocalMember(false));
+                    entry.setBackup(false);
+                    entry.setProxy(false);
+                    Member[] backup = publishEntryInfo(entry.getKey(), 
entry.getValue());
+                    entry.setBackupNodes(backup);
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a 
new backup node", x);
+                }
+            }
+
         } //while
     }
 
@@ -761,7 +795,7 @@
 
         try {
             if (getMapMembers().length > 0 && notify) {
-                MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+                MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null);
                 getChannel().send(getMapMembers(), msg, 
getChannelSendOptions());
             }
         } catch ( ChannelException x ) {
@@ -786,7 +820,7 @@
                 if ( !entry.isBackup() ) {
                     //make sure we don't retrieve from ourselves
                     msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_RETRIEVE_BACKUP, false,
-                                         (Serializable) key, null, null, null);
+                                         (Serializable) key, null, null, 
null,null);
                     Response[] resp = 
getRpcChannel().send(entry.getBackupNodes(),msg, 
this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, 
getRpcTimeout());
                     if (resp == null || resp.length == 0) {
                         //no responses
@@ -807,13 +841,13 @@
                     backup = publishEntryInfo(key, entry.getValue());
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
-                    msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+                    msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
                     Member[] dest = getMapMembersExcl(backup);
                     if ( dest!=null && dest.length >0) {
                         getChannel().send(dest, msg, getChannelSendOptions());
                     }
                 }
-
+                entry.setPrimary(channel.getLocalMember(false));
                 entry.setBackupNodes(backup);
                 entry.setBackup(false);
                 entry.setProxy(false);
@@ -875,6 +909,7 @@
             MapEntry entry = new MapEntry(key,value);
             entry.setBackup(false);
             entry.setProxy(false);
+            entry.setPrimary(channel.getLocalMember(false));
     
             Object old = null;
     
@@ -963,7 +998,7 @@
                 Map.Entry e = (Map.Entry)i.next();
                 Object key = e.getKey();
                 MapEntry entry = (MapEntry)super.get(key);
-                if ( entry.isPrimary() ) set.add(entry.getValue());
+                if ( entry != null && entry.isPrimary() ) 
set.add(entry.getValue());
             }
             return Collections.unmodifiableSet(set);
         }
@@ -977,7 +1012,7 @@
                 Map.Entry e = (Map.Entry)i.next();
                 Object key = e.getKey();
                 MapEntry entry = (MapEntry)super.get(key);
-                if ( entry.isPrimary() ) set.add(key);
+                if ( entry!=null && entry.isPrimary() ) set.add(key);
             }
             return Collections.unmodifiableSet(set);
 
@@ -1026,7 +1061,7 @@
         private boolean backup;
         private boolean proxy;
         private Member[] backupNodes;
-
+        private Member primary;
         private Object key;
         private Object value;
 
@@ -1080,6 +1115,14 @@
         public Member[] getBackupNodes() {
             return backupNodes;
         }
+        
+        public void setPrimary(Member m) {
+            primary = m;
+        }
+        
+        public Member getPrimary() {
+            return primary;
+        }
 
         public Object getValue() {
             return value;
@@ -1172,6 +1215,7 @@
         private byte[] keydata;
         private byte[] diffvalue;
         private Member[] nodes;
+        private Member primary;
         
         public String toString() {
             StringBuffer buf = new StringBuffer("MapMessage[context=");
@@ -1205,7 +1249,7 @@
 
         public MapMessage(byte[] mapId,int msgtype, boolean diff,
                           Serializable key, Serializable value,
-                          byte[] diffvalue, Member[] nodes)  {
+                          byte[] diffvalue, Member primary, Member[] nodes)  {
             this.mapId = mapId;
             this.msgtype = msgtype;
             this.diff = diff;
@@ -1213,6 +1257,7 @@
             this.value = value;
             this.diffvalue = diffvalue;
             this.nodes = nodes;
+            this.primary = primary;
             setValue(value);
             setKey(key);
         }
@@ -1283,6 +1328,14 @@
         private void setBackUpNodes(Member[] nodes) {
             this.nodes = nodes;
         }
+        
+        public Member getPrimary() {
+            return primary;
+        }
+        
+        private void setPrimary(Member m) {
+            primary = m;
+        }
 
         public byte[] getMapId() {
             return mapId;
@@ -1335,7 +1388,7 @@
          * @return Object
          */
         public Object clone() {
-            MapMessage msg = new MapMessage(this.mapId, this.msgtype, 
this.diff, this.key, this.value, this.diffvalue, this.nodes);
+            MapMessage msg = new MapMessage(this.mapId, this.msgtype, 
this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes);
             msg.keydata = this.keydata;
             msg.valuedata = this.valuedata;
             return msg;

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Thu Aug  9 16:30:41 2007
@@ -151,7 +151,7 @@
                 backup = wrap(next);
                 //publish the backup data to one node
                 msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_BACKUP, false,
-                                     (Serializable) key, (Serializable) value, 
null, backup);
+                                     (Serializable) key, (Serializable) value, 
null, channel.getLocalMember(false), backup);
                 if ( log.isTraceEnabled() ) 
                     log.trace("Publishing backup data:"+msg+" to: 
"+next.getName());
                 UniqueId id = getChannel().send(backup, msg, 
getChannelSendOptions());
@@ -167,7 +167,7 @@
                 Member[] proxies = excludeFromSet(backup, getMapMembers());
                 if (success && proxies.length > 0 ) {
                     msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_PROXY, false,
-                                         (Serializable) key, null, null, 
backup);
+                                         (Serializable) key, null, null, 
channel.getLocalMember(false),backup);
                     if ( log.isTraceEnabled() ) 
                     log.trace("Publishing proxy data:"+msg+" to: 
"+Arrays.toNameString(proxies));
                     getChannel().send(proxies, msg, getChannelSendOptions());

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java 
Thu Aug  9 16:30:41 2007
@@ -112,7 +112,7 @@
 
         //publish the data out to all nodes
         MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_COPY, false,
-                                        (Serializable) key, (Serializable) 
value, null, backup);
+                                        (Serializable) key, (Serializable) 
value, null,channel.getLocalMember(false), backup);
 
         getChannel().send(getMapMembers(), msg, getChannelSendOptions());
 

Modified: 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java 
(original)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Thu 
Aug  9 16:30:41 2007
@@ -166,6 +166,7 @@
             String[] columnNames = {
                                    "Key",
                                    "Value",
+                                   "Primary Node",
                                    "Backup Node",
                                    "isPrimary",
                                    "isProxy",
@@ -198,10 +199,11 @@
                 switch (col) {
                     case 0: return entry.getKey();
                     case 1: return entry.getValue();
-                    case 2: return getMemberNames(entry.getBackupNodes());
-                    case 3: return new Boolean(entry.isPrimary());
-                    case 4: return new Boolean(entry.isProxy());
-                    case 5: return new Boolean(entry.isBackup());
+                    case 2: return 
entry.getPrimary()!=null?entry.getPrimary().getName():"null";
+                    case 3: return getMemberNames(entry.getBackupNodes());
+                    case 4: return new Boolean(entry.isPrimary());
+                    case 5: return new Boolean(entry.isProxy());
+                    case 6: return new Boolean(entry.isBackup());
                     default: return "";
                 }
                 
@@ -408,9 +410,9 @@
             cell.setBackground(Color.WHITE);
             if ( row > 0 ) {
                 Color color = null;
-                boolean primary = ( (Boolean) table.getValueAt(row, 
3)).booleanValue();
-                boolean proxy = ( (Boolean) table.getValueAt(row, 
4)).booleanValue();
-                boolean backup = ( (Boolean) table.getValueAt(row, 
5)).booleanValue();
+                boolean primary = ( (Boolean) table.getValueAt(row, 
4)).booleanValue();
+                boolean proxy = ( (Boolean) table.getValueAt(row, 
5)).booleanValue();
+                boolean backup = ( (Boolean) table.getValueAt(row, 
6)).booleanValue();
                 if (primary) color = Color.GREEN;
                 else if (proxy) color = Color.RED;
                 else if (backup) color = Color.BLUE;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to