Author: fhanik
Date: Thu Aug  9 11:43:12 2007
New Revision: 564335

URL: http://svn.apache.org/viewvc?view=rev&rev=564335
Log:
Publish primary node information as well

Modified:
    
tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=564335&r1=564334&r2=564335
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java 
(original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java 
Thu Aug  9 11:43:12 2007
@@ -239,7 +239,8 @@
                                         null, 
                                         null, 
                                         null, 
-                                        wrap(channel.getLocalMember(false)));
+                                        channel.getLocalMember(false),
+                                        new Member[0]);
         if ( channel.getMembers().length > 0 ) {
             //send a ping, wait for all nodes to reply
             Response[] resp = rpcChannel.send(channel.getMembers(), 
@@ -287,7 +288,7 @@
     protected void broadcast(int msgtype, boolean rpc) throws ChannelException 
{
         //send out a map membership message, only wait for the first reply
         MapMessage msg = new MapMessage(this.mapContextName, msgtype,
-                                        false, null, null, null, 
wrap(channel.getLocalMember(false)));
+                                        false, null, null, null, 
channel.getLocalMember(false), new Member[0]);
         if ( rpc) {
             Response[] resp = rpcChannel.send(channel.getMembers(), msg, 
rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
             for (int i = 0; i < resp.length; i++) {
@@ -391,6 +392,7 @@
                     msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                          true, (Serializable) entry.getKey(), 
null,
                                          rentry.getDiff(),
+                                         entry.getPrimary(),
                                          entry.getBackupNodes());
                 } catch (IOException x) {
                     log.error("Unable to diff object. Will replicate the 
entire object instead.", x);
@@ -404,7 +406,7 @@
                 msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                      false, (Serializable) entry.getKey(),
                                      (Serializable) entry.getValue(),
-                                     null, entry.getBackupNodes());
+                                     null, 
entry.getPrimary(),entry.getBackupNodes());
 
             }
             try {
@@ -439,7 +441,7 @@
             Member backup = members.length > 0 ? (Member) members[0] : null;
             if (backup != null) {
                 MapMessage msg = new MapMessage(mapContextName, 
getStateMessageType(), false,
-                                                null, null, null, null);
+                                                null, null, null, null, null);
                 Response[] resp = rpcChannel.send(new Member[] {backup}, msg, 
rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
                 if (resp.length > 0) {
                     synchronized (stateMutex) {
@@ -506,7 +508,7 @@
                         boolean copy = (mapmsg.getMsgType() == 
mapmsg.MSG_STATE_COPY);
                         MapMessage me = new MapMessage(mapContextName, 
                                                        
copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
-                            false, (Serializable) entry.getKey(), 
copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
+                            false, (Serializable) entry.getKey(), 
copy?(Serializable) entry.getValue():null, null, 
entry.getPrimary(),entry.getBackupNodes());
                         list.add(me);
                     }
                 }
@@ -534,9 +536,9 @@
         try {
             mapmsg.deserialize(getExternalLoaders());
             if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-                mapMemberAdded(mapmsg.getBackupNodes()[0]);
+                mapMemberAdded(mapmsg.getPrimary());
             } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
-                memberAlive(mapmsg.getBackupNodes()[0]);
+                memberAlive(mapmsg.getPrimary());
             }
         } catch (IOException x ) {
             log.error("Unable to deserialize MapMessage.",x);
@@ -565,11 +567,11 @@
         if ( log.isTraceEnabled() ) 
             log.trace("Map message received from:"+sender.getName()+" 
msg:"+mapmsg);
         if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-            mapMemberAdded(mapmsg.getBackupNodes()[0]);
+            mapMemberAdded(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
-            memberDisappeared(mapmsg.getBackupNodes()[0]);
+            memberDisappeared(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
@@ -579,11 +581,13 @@
                 entry.setBackup(false);
                 entry.setProxy(true);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 super.put(entry.getKey(), entry);
             } else {
                 entry.setProxy(true);
                 entry.setBackup(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
             }
         }
 
@@ -598,6 +602,7 @@
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof 
ReplicatedMapEntry ) {
                     
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                 }
@@ -761,7 +766,7 @@
 
         try {
             if (getMapMembers().length > 0 && notify) {
-                MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+                MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null);
                 getChannel().send(getMapMembers(), msg, 
getChannelSendOptions());
             }
         } catch ( ChannelException x ) {
@@ -786,7 +791,7 @@
                 if ( !entry.isBackup() ) {
                     //make sure we don't retrieve from ourselves
                     msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_RETRIEVE_BACKUP, false,
-                                         (Serializable) key, null, null, null);
+                                         (Serializable) key, null, null, 
null,null);
                     Response[] resp = 
getRpcChannel().send(entry.getBackupNodes(),msg, 
this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, 
getRpcTimeout());
                     if (resp == null || resp.length == 0) {
                         //no responses
@@ -807,7 +812,7 @@
                     backup = publishEntryInfo(key, entry.getValue());
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
-                    msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+                    msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
                     Member[] dest = getMapMembersExcl(backup);
                     if ( dest!=null && dest.length >0) {
                         getChannel().send(dest, msg, getChannelSendOptions());
@@ -875,6 +880,7 @@
             MapEntry entry = new MapEntry(key,value);
             entry.setBackup(false);
             entry.setProxy(false);
+            entry.setPrimary(channel.getLocalMember(false));
     
             Object old = null;
     
@@ -1026,7 +1032,7 @@
         private boolean backup;
         private boolean proxy;
         private Member[] backupNodes;
-
+        private Member primary;
         private Object key;
         private Object value;
 
@@ -1080,6 +1086,14 @@
         public Member[] getBackupNodes() {
             return backupNodes;
         }
+        
+        public void setPrimary(Member m) {
+            primary = m;
+        }
+        
+        public Member getPrimary() {
+            return primary;
+        }
 
         public Object getValue() {
             return value;
@@ -1172,6 +1186,7 @@
         private byte[] keydata;
         private byte[] diffvalue;
         private Member[] nodes;
+        private Member primary;
         
         public String toString() {
             StringBuffer buf = new StringBuffer("MapMessage[context=");
@@ -1205,7 +1220,7 @@
 
         public MapMessage(byte[] mapId,int msgtype, boolean diff,
                           Serializable key, Serializable value,
-                          byte[] diffvalue, Member[] nodes)  {
+                          byte[] diffvalue, Member primary, Member[] nodes)  {
             this.mapId = mapId;
             this.msgtype = msgtype;
             this.diff = diff;
@@ -1213,6 +1228,7 @@
             this.value = value;
             this.diffvalue = diffvalue;
             this.nodes = nodes;
+            this.primary = primary;
             setValue(value);
             setKey(key);
         }
@@ -1283,6 +1299,14 @@
         private void setBackUpNodes(Member[] nodes) {
             this.nodes = nodes;
         }
+        
+        public Member getPrimary() {
+            return primary;
+        }
+        
+        private void setPrimary(Member m) {
+            primary = m;
+        }
 
         public byte[] getMapId() {
             return mapId;
@@ -1335,7 +1359,7 @@
          * @return Object
          */
         public Object clone() {
-            MapMessage msg = new MapMessage(this.mapId, this.msgtype, 
this.diff, this.key, this.value, this.diffvalue, this.nodes);
+            MapMessage msg = new MapMessage(this.mapId, this.msgtype, 
this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes);
             msg.keydata = this.keydata;
             msg.valuedata = this.valuedata;
             return msg;

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?view=diff&rev=564335&r1=564334&r2=564335
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java 
Thu Aug  9 11:43:12 2007
@@ -151,7 +151,7 @@
                 backup = wrap(next);
                 //publish the backup data to one node
                 msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_BACKUP, false,
-                                     (Serializable) key, (Serializable) value, 
null, backup);
+                                     (Serializable) key, (Serializable) value, 
null, channel.getLocalMember(false), backup);
                 if ( log.isTraceEnabled() ) 
                     log.trace("Publishing backup data:"+msg+" to: 
"+next.getName());
                 UniqueId id = getChannel().send(backup, msg, 
getChannelSendOptions());
@@ -167,7 +167,7 @@
                 Member[] proxies = excludeFromSet(backup, getMapMembers());
                 if (success && proxies.length > 0 ) {
                     msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_PROXY, false,
-                                         (Serializable) key, null, null, 
backup);
+                                         (Serializable) key, null, null, 
channel.getLocalMember(false),backup);
                     if ( log.isTraceEnabled() ) 
                     log.trace("Publishing proxy data:"+msg+" to: 
"+Arrays.toNameString(proxies));
                     getChannel().send(proxies, msg, getChannelSendOptions());

Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?view=diff&rev=564335&r1=564334&r2=564335
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Thu 
Aug  9 11:43:12 2007
@@ -112,7 +112,7 @@
 
         //publish the data out to all nodes
         MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_COPY, false,
-                                        (Serializable) key, (Serializable) 
value, null, backup);
+                                        (Serializable) key, (Serializable) 
value, null,channel.getLocalMember(false), backup);
 
         getChannel().send(getMapMembers(), msg, getChannelSendOptions());
 

Modified: tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java?view=diff&rev=564335&r1=564334&r2=564335
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Thu Aug  9 
11:43:12 2007
@@ -166,6 +166,7 @@
             String[] columnNames = {
                                    "Key",
                                    "Value",
+                                   "Primary Node",
                                    "Backup Node",
                                    "isPrimary",
                                    "isProxy",
@@ -198,10 +199,11 @@
                 switch (col) {
                     case 0: return entry.getKey();
                     case 1: return entry.getValue();
-                    case 2: return getMemberNames(entry.getBackupNodes());
-                    case 3: return new Boolean(entry.isPrimary());
-                    case 4: return new Boolean(entry.isProxy());
-                    case 5: return new Boolean(entry.isBackup());
+                    case 2: return 
entry.getPrimary()!=null?entry.getPrimary().getName():"null";
+                    case 3: return getMemberNames(entry.getBackupNodes());
+                    case 4: return new Boolean(entry.isPrimary());
+                    case 5: return new Boolean(entry.isProxy());
+                    case 6: return new Boolean(entry.isBackup());
                     default: return "";
                 }
                 
@@ -408,9 +410,9 @@
             cell.setBackground(Color.WHITE);
             if ( row > 0 ) {
                 Color color = null;
-                boolean primary = ( (Boolean) table.getValueAt(row, 
3)).booleanValue();
-                boolean proxy = ( (Boolean) table.getValueAt(row, 
4)).booleanValue();
-                boolean backup = ( (Boolean) table.getValueAt(row, 
5)).booleanValue();
+                boolean primary = ( (Boolean) table.getValueAt(row, 
4)).booleanValue();
+                boolean proxy = ( (Boolean) table.getValueAt(row, 
5)).booleanValue();
+                boolean backup = ( (Boolean) table.getValueAt(row, 
6)).booleanValue();
                 if (primary) color = Color.GREEN;
                 else if (proxy) color = Color.RED;
                 else if (backup) color = Color.BLUE;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to