Author: fhanik Date: Thu Aug 9 16:30:41 2007 New Revision: 564422 URL: http://svn.apache.org/viewvc?view=rev&rev=564422 Log: backport from trunk http://issues.apache.org/bugzilla/show_bug.cgi?id=43053
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Aug 9 16:30:41 2007 @@ -239,7 +239,8 @@ null, null, null, - wrap(channel.getLocalMember(false))); + channel.getLocalMember(false), + new Member[0]); if ( channel.getMembers().length > 0 ) { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), @@ -287,7 +288,7 @@ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, - false, null, null, null, wrap(channel.getLocalMember(false))); + false, null, null, null, channel.getLocalMember(false), new Member[0]); if ( rpc) { Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout); for (int i = 0; i < resp.length; i++) { @@ -391,6 +392,7 @@ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable) entry.getKey(), null, rentry.getDiff(), + entry.getPrimary(), entry.getBackupNodes()); } catch (IOException x) { log.error("Unable to diff object. Will replicate the entire object instead.", x); @@ -404,7 +406,7 @@ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), - null, entry.getBackupNodes()); + null, entry.getPrimary(),entry.getBackupNodes()); } try { @@ -439,7 +441,7 @@ Member backup = members.length > 0 ? (Member) members[0] : null; if (backup != null) { MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false, - null, null, null, null); + null, null, null, null, null); Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); if (resp.length > 0) { synchronized (stateMutex) { @@ -506,7 +508,7 @@ boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY); MapMessage me = new MapMessage(mapContextName, copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY, - false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes()); + false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getPrimary(),entry.getBackupNodes()); list.add(me); } } @@ -534,9 +536,9 @@ try { mapmsg.deserialize(getExternalLoaders()); if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getBackupNodes()[0]); + mapMemberAdded(mapmsg.getPrimary()); } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { - memberAlive(mapmsg.getBackupNodes()[0]); + memberAlive(mapmsg.getPrimary()); } } catch (IOException x ) { log.error("Unable to deserialize MapMessage.",x); @@ -565,11 +567,11 @@ if ( log.isTraceEnabled() ) log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg); if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getBackupNodes()[0]); + mapMemberAdded(mapmsg.getPrimary()); } if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { - memberDisappeared(mapmsg.getBackupNodes()[0]); + memberDisappeared(mapmsg.getPrimary()); } if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { @@ -579,11 +581,13 @@ entry.setBackup(false); entry.setProxy(true); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); super.put(entry.getKey(), entry); } else { entry.setProxy(true); entry.setBackup(false); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); } } @@ -598,6 +602,7 @@ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); } @@ -605,6 +610,7 @@ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue(); if (mapmsg.isDiff()) { @@ -663,6 +669,7 @@ try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { log.error("Unable to select backup node.", x); } //catch @@ -700,6 +707,7 @@ synchronized (mapMembers) { removed = (mapMembers.remove(member) != null ); } + Iterator i = super.entrySet().iterator(); while (i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); @@ -708,10 +716,36 @@ try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); } + } else if (member.equals(entry.getPrimary())) { + entry.setPrimary(null); } //end if + + if ( entry.isProxy() && + entry.getPrimary() == null && + entry.getBackupNodes()!=null && + entry.getBackupNodes().length == 1 && + entry.getBackupNodes()[0].equals(member) ) { + //remove proxies that have no backup nor primaries + i.remove(); + } else if ( entry.isBackup() && + entry.getBackupNodes()!=null && + entry.getBackupNodes().length == 1 && + entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) { + try { + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackup(false); + entry.setProxy(false); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } + } //while } @@ -761,7 +795,7 @@ try { if (getMapMembers().length > 0 && notify) { - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null); + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null); getChannel().send(getMapMembers(), msg, getChannelSendOptions()); } } catch ( ChannelException x ) { @@ -786,7 +820,7 @@ if ( !entry.isBackup() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, - (Serializable) key, null, null, null); + (Serializable) key, null, null, null,null); Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout()); if (resp == null || resp.length == 0) { //no responses @@ -807,13 +841,13 @@ backup = publishEntryInfo(key, entry.getValue()); } else if ( entry.isProxy() ) { //invalidate the previous primary - msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); + msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); Member[] dest = getMapMembersExcl(backup); if ( dest!=null && dest.length >0) { getChannel().send(dest, msg, getChannelSendOptions()); } } - + entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); @@ -875,6 +909,7 @@ MapEntry entry = new MapEntry(key,value); entry.setBackup(false); entry.setProxy(false); + entry.setPrimary(channel.getLocalMember(false)); Object old = null; @@ -963,7 +998,7 @@ Map.Entry e = (Map.Entry)i.next(); Object key = e.getKey(); MapEntry entry = (MapEntry)super.get(key); - if ( entry.isPrimary() ) set.add(entry.getValue()); + if ( entry != null && entry.isPrimary() ) set.add(entry.getValue()); } return Collections.unmodifiableSet(set); } @@ -977,7 +1012,7 @@ Map.Entry e = (Map.Entry)i.next(); Object key = e.getKey(); MapEntry entry = (MapEntry)super.get(key); - if ( entry.isPrimary() ) set.add(key); + if ( entry!=null && entry.isPrimary() ) set.add(key); } return Collections.unmodifiableSet(set); @@ -1026,7 +1061,7 @@ private boolean backup; private boolean proxy; private Member[] backupNodes; - + private Member primary; private Object key; private Object value; @@ -1080,6 +1115,14 @@ public Member[] getBackupNodes() { return backupNodes; } + + public void setPrimary(Member m) { + primary = m; + } + + public Member getPrimary() { + return primary; + } public Object getValue() { return value; @@ -1172,6 +1215,7 @@ private byte[] keydata; private byte[] diffvalue; private Member[] nodes; + private Member primary; public String toString() { StringBuffer buf = new StringBuffer("MapMessage[context="); @@ -1205,7 +1249,7 @@ public MapMessage(byte[] mapId,int msgtype, boolean diff, Serializable key, Serializable value, - byte[] diffvalue, Member[] nodes) { + byte[] diffvalue, Member primary, Member[] nodes) { this.mapId = mapId; this.msgtype = msgtype; this.diff = diff; @@ -1213,6 +1257,7 @@ this.value = value; this.diffvalue = diffvalue; this.nodes = nodes; + this.primary = primary; setValue(value); setKey(key); } @@ -1283,6 +1328,14 @@ private void setBackUpNodes(Member[] nodes) { this.nodes = nodes; } + + public Member getPrimary() { + return primary; + } + + private void setPrimary(Member m) { + primary = m; + } public byte[] getMapId() { return mapId; @@ -1335,7 +1388,7 @@ * @return Object */ public Object clone() { - MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes); + MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes); msg.keydata = this.keydata; msg.valuedata = this.valuedata; return msg; Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Aug 9 16:30:41 2007 @@ -151,7 +151,7 @@ backup = wrap(next); //publish the backup data to one node msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, - (Serializable) key, (Serializable) value, null, backup); + (Serializable) key, (Serializable) value, null, channel.getLocalMember(false), backup); if ( log.isTraceEnabled() ) log.trace("Publishing backup data:"+msg+" to: "+next.getName()); UniqueId id = getChannel().send(backup, msg, getChannelSendOptions()); @@ -167,7 +167,7 @@ Member[] proxies = excludeFromSet(backup, getMapMembers()); if (success && proxies.length > 0 ) { msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, - (Serializable) key, null, null, backup); + (Serializable) key, null, null, channel.getLocalMember(false),backup); if ( log.isTraceEnabled() ) log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies)); getChannel().send(proxies, msg, getChannelSendOptions()); Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Thu Aug 9 16:30:41 2007 @@ -112,7 +112,7 @@ //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, - (Serializable) key, (Serializable) value, null, backup); + (Serializable) key, (Serializable) value, null,channel.getLocalMember(false), backup); getChannel().send(getMapMembers(), msg, getChannelSendOptions()); Modified: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java?view=diff&rev=564422&r1=564421&r2=564422 ============================================================================== --- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Thu Aug 9 16:30:41 2007 @@ -166,6 +166,7 @@ String[] columnNames = { "Key", "Value", + "Primary Node", "Backup Node", "isPrimary", "isProxy", @@ -198,10 +199,11 @@ switch (col) { case 0: return entry.getKey(); case 1: return entry.getValue(); - case 2: return getMemberNames(entry.getBackupNodes()); - case 3: return new Boolean(entry.isPrimary()); - case 4: return new Boolean(entry.isProxy()); - case 5: return new Boolean(entry.isBackup()); + case 2: return entry.getPrimary()!=null?entry.getPrimary().getName():"null"; + case 3: return getMemberNames(entry.getBackupNodes()); + case 4: return new Boolean(entry.isPrimary()); + case 5: return new Boolean(entry.isProxy()); + case 6: return new Boolean(entry.isBackup()); default: return ""; } @@ -408,9 +410,9 @@ cell.setBackground(Color.WHITE); if ( row > 0 ) { Color color = null; - boolean primary = ( (Boolean) table.getValueAt(row, 3)).booleanValue(); - boolean proxy = ( (Boolean) table.getValueAt(row, 4)).booleanValue(); - boolean backup = ( (Boolean) table.getValueAt(row, 5)).booleanValue(); + boolean primary = ( (Boolean) table.getValueAt(row, 4)).booleanValue(); + boolean proxy = ( (Boolean) table.getValueAt(row, 5)).booleanValue(); + boolean backup = ( (Boolean) table.getValueAt(row, 6)).booleanValue(); if (primary) color = Color.GREEN; else if (proxy) color = Color.RED; else if (backup) color = Color.BLUE; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]