Author: fhanik Date: Thu Aug 9 11:43:12 2007 New Revision: 564335 URL: http://svn.apache.org/viewvc?view=rev&rev=564335 Log: Publish primary node information as well
Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=564335&r1=564334&r2=564335 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Aug 9 11:43:12 2007 @@ -239,7 +239,8 @@ null, null, null, - wrap(channel.getLocalMember(false))); + channel.getLocalMember(false), + new Member[0]); if ( channel.getMembers().length > 0 ) { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), @@ -287,7 +288,7 @@ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, - false, null, null, null, wrap(channel.getLocalMember(false))); + false, null, null, null, channel.getLocalMember(false), new Member[0]); if ( rpc) { Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout); for (int i = 0; i < resp.length; i++) { @@ -391,6 +392,7 @@ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable) entry.getKey(), null, rentry.getDiff(), + entry.getPrimary(), entry.getBackupNodes()); } catch (IOException x) { log.error("Unable to diff object. Will replicate the entire object instead.", x); @@ -404,7 +406,7 @@ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), - null, entry.getBackupNodes()); + null, entry.getPrimary(),entry.getBackupNodes()); } try { @@ -439,7 +441,7 @@ Member backup = members.length > 0 ? (Member) members[0] : null; if (backup != null) { MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false, - null, null, null, null); + null, null, null, null, null); Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); if (resp.length > 0) { synchronized (stateMutex) { @@ -506,7 +508,7 @@ boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY); MapMessage me = new MapMessage(mapContextName, copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY, - false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes()); + false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getPrimary(),entry.getBackupNodes()); list.add(me); } } @@ -534,9 +536,9 @@ try { mapmsg.deserialize(getExternalLoaders()); if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getBackupNodes()[0]); + mapMemberAdded(mapmsg.getPrimary()); } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { - memberAlive(mapmsg.getBackupNodes()[0]); + memberAlive(mapmsg.getPrimary()); } } catch (IOException x ) { log.error("Unable to deserialize MapMessage.",x); @@ -565,11 +567,11 @@ if ( log.isTraceEnabled() ) log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg); if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getBackupNodes()[0]); + mapMemberAdded(mapmsg.getPrimary()); } if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { - memberDisappeared(mapmsg.getBackupNodes()[0]); + memberDisappeared(mapmsg.getPrimary()); } if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { @@ -579,11 +581,13 @@ entry.setBackup(false); entry.setProxy(true); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); super.put(entry.getKey(), entry); } else { entry.setProxy(true); entry.setBackup(false); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); } } @@ -598,6 +602,7 @@ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); } @@ -761,7 +766,7 @@ try { if (getMapMembers().length > 0 && notify) { - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null); + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null); getChannel().send(getMapMembers(), msg, getChannelSendOptions()); } } catch ( ChannelException x ) { @@ -786,7 +791,7 @@ if ( !entry.isBackup() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, - (Serializable) key, null, null, null); + (Serializable) key, null, null, null,null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout()); if (resp == null || resp.length == 0) { //no responses @@ -807,7 +812,7 @@ backup = publishEntryInfo(key, entry.getValue()); } else if ( entry.isProxy() ) { //invalidate the previous primary - msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); + msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); Member[] dest = getMapMembersExcl(backup); if ( dest!=null && dest.length >0) { getChannel().send(dest, msg, getChannelSendOptions()); @@ -875,6 +880,7 @@ MapEntry entry = new MapEntry(key,value); entry.setBackup(false); entry.setProxy(false); + entry.setPrimary(channel.getLocalMember(false)); Object old = null; @@ -1026,7 +1032,7 @@ private boolean backup; private boolean proxy; private Member[] backupNodes; - + private Member primary; private Object key; private Object value; @@ -1080,6 +1086,14 @@ public Member[] getBackupNodes() { return backupNodes; } + + public void setPrimary(Member m) { + primary = m; + } + + public Member getPrimary() { + return primary; + } public Object getValue() { return value; @@ -1172,6 +1186,7 @@ private byte[] keydata; private byte[] diffvalue; private Member[] nodes; + private Member primary; public String toString() { StringBuffer buf = new StringBuffer("MapMessage[context="); @@ -1205,7 +1220,7 @@ public MapMessage(byte[] mapId,int msgtype, boolean diff, Serializable key, Serializable value, - byte[] diffvalue, Member[] nodes) { + byte[] diffvalue, Member primary, Member[] nodes) { this.mapId = mapId; this.msgtype = msgtype; this.diff = diff; @@ -1213,6 +1228,7 @@ this.value = value; this.diffvalue = diffvalue; this.nodes = nodes; + this.primary = primary; setValue(value); setKey(key); } @@ -1283,6 +1299,14 @@ private void setBackUpNodes(Member[] nodes) { this.nodes = nodes; } + + public Member getPrimary() { + return primary; + } + + private void setPrimary(Member m) { + primary = m; + } public byte[] getMapId() { return mapId; @@ -1335,7 +1359,7 @@ * @return Object */ public Object clone() { - MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes); + MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes); msg.keydata = this.keydata; msg.valuedata = this.valuedata; return msg; Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?view=diff&rev=564335&r1=564334&r2=564335 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Aug 9 11:43:12 2007 @@ -151,7 +151,7 @@ backup = wrap(next); //publish the backup data to one node msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, - (Serializable) key, (Serializable) value, null, backup); + (Serializable) key, (Serializable) value, null, channel.getLocalMember(false), backup); if ( log.isTraceEnabled() ) log.trace("Publishing backup data:"+msg+" to: "+next.getName()); UniqueId id = getChannel().send(backup, msg, getChannelSendOptions()); @@ -167,7 +167,7 @@ Member[] proxies = excludeFromSet(backup, getMapMembers()); if (success && proxies.length > 0 ) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, - (Serializable) key, null, null, backup); + (Serializable) key, null, null, channel.getLocalMember(false),backup); if ( log.isTraceEnabled() ) log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies)); getChannel().send(proxies, msg, getChannelSendOptions()); Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?view=diff&rev=564335&r1=564334&r2=564335 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Thu Aug 9 11:43:12 2007 @@ -112,7 +112,7 @@ //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, - (Serializable) key, (Serializable) value, null, backup); + (Serializable) key, (Serializable) value, null,channel.getLocalMember(false), backup); getChannel().send(getMapMembers(), msg, getChannelSendOptions()); Modified: tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java?view=diff&rev=564335&r1=564334&r2=564335 ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Thu Aug 9 11:43:12 2007 @@ -166,6 +166,7 @@ String[] columnNames = { "Key", "Value", + "Primary Node", "Backup Node", "isPrimary", "isProxy", @@ -198,10 +199,11 @@ switch (col) { case 0: return entry.getKey(); case 1: return entry.getValue(); - case 2: return getMemberNames(entry.getBackupNodes()); - case 3: return new Boolean(entry.isPrimary()); - case 4: return new Boolean(entry.isProxy()); - case 5: return new Boolean(entry.isBackup()); + case 2: return entry.getPrimary()!=null?entry.getPrimary().getName():"null"; + case 3: return getMemberNames(entry.getBackupNodes()); + case 4: return new Boolean(entry.isPrimary()); + case 5: return new Boolean(entry.isProxy()); + case 6: return new Boolean(entry.isBackup()); default: return ""; } @@ -408,9 +410,9 @@ cell.setBackground(Color.WHITE); if ( row > 0 ) { Color color = null; - boolean primary = ( (Boolean) table.getValueAt(row, 3)).booleanValue(); - boolean proxy = ( (Boolean) table.getValueAt(row, 4)).booleanValue(); - boolean backup = ( (Boolean) table.getValueAt(row, 5)).booleanValue(); + boolean primary = ( (Boolean) table.getValueAt(row, 4)).booleanValue(); + boolean proxy = ( (Boolean) table.getValueAt(row, 5)).booleanValue(); + boolean backup = ( (Boolean) table.getValueAt(row, 6)).booleanValue(); if (primary) color = Color.GREEN; else if (proxy) color = Color.RED; else if (backup) color = Color.BLUE; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]