Author: fhanik
Date: Thu Aug 9 16:30:41 2007
New Revision: 564422
URL: http://svn.apache.org/viewvc?view=rev&rev=564422
Log:
backport from trunk
http://issues.apache.org/bugzilla/show_bug.cgi?id=43053
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Thu Aug 9 16:30:41 2007
@@ -239,7 +239,8 @@
null,
null,
null,
- wrap(channel.getLocalMember(false)));
+ channel.getLocalMember(false),
+ new Member[0]);
if ( channel.getMembers().length > 0 ) {
//send a ping, wait for all nodes to reply
Response[] resp = rpcChannel.send(channel.getMembers(),
@@ -287,7 +288,7 @@
protected void broadcast(int msgtype, boolean rpc) throws ChannelException
{
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
- false, null, null, null,
wrap(channel.getLocalMember(false)));
+ false, null, null, null,
channel.getLocalMember(false), new Member[0]);
if ( rpc) {
Response[] resp = rpcChannel.send(channel.getMembers(), msg,
rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
for (int i = 0; i < resp.length; i++) {
@@ -391,6 +392,7 @@
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
true, (Serializable) entry.getKey(),
null,
rentry.getDiff(),
+ entry.getPrimary(),
entry.getBackupNodes());
} catch (IOException x) {
log.error("Unable to diff object. Will replicate the
entire object instead.", x);
@@ -404,7 +406,7 @@
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
- null, entry.getBackupNodes());
+ null,
entry.getPrimary(),entry.getBackupNodes());
}
try {
@@ -439,7 +441,7 @@
Member backup = members.length > 0 ? (Member) members[0] : null;
if (backup != null) {
MapMessage msg = new MapMessage(mapContextName,
getStateMessageType(), false,
- null, null, null, null);
+ null, null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg,
rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
synchronized (stateMutex) {
@@ -506,7 +508,7 @@
boolean copy = (mapmsg.getMsgType() ==
mapmsg.MSG_STATE_COPY);
MapMessage me = new MapMessage(mapContextName,
copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(),
copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
+ false, (Serializable) entry.getKey(),
copy?(Serializable) entry.getValue():null, null,
entry.getPrimary(),entry.getBackupNodes());
list.add(me);
}
}
@@ -534,9 +536,9 @@
try {
mapmsg.deserialize(getExternalLoaders());
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ mapMemberAdded(mapmsg.getPrimary());
} else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
- memberAlive(mapmsg.getBackupNodes()[0]);
+ memberAlive(mapmsg.getPrimary());
}
} catch (IOException x ) {
log.error("Unable to deserialize MapMessage.",x);
@@ -565,11 +567,11 @@
if ( log.isTraceEnabled() )
log.trace("Map message received from:"+sender.getName()+"
msg:"+mapmsg);
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ mapMemberAdded(mapmsg.getPrimary());
}
if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
- memberDisappeared(mapmsg.getBackupNodes()[0]);
+ memberDisappeared(mapmsg.getPrimary());
}
if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
@@ -579,11 +581,13 @@
entry.setBackup(false);
entry.setProxy(true);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
super.put(entry.getKey(), entry);
} else {
entry.setProxy(true);
entry.setBackup(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
}
}
@@ -598,6 +602,7 @@
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof
ReplicatedMapEntry ) {
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
}
@@ -605,6 +610,7 @@
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
if (entry.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry diff = (ReplicatedMapEntry)
entry.getValue();
if (mapmsg.isDiff()) {
@@ -663,6 +669,7 @@
try {
Member[] backup = publishEntryInfo(entry.getKey(),
entry.getValue());
entry.setBackupNodes(backup);
+ entry.setPrimary(channel.getLocalMember(false));
} catch (ChannelException x) {
log.error("Unable to select backup node.", x);
} //catch
@@ -700,6 +707,7 @@
synchronized (mapMembers) {
removed = (mapMembers.remove(member) != null );
}
+
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
@@ -708,10 +716,36 @@
try {
Member[] backup = publishEntryInfo(entry.getKey(),
entry.getValue());
entry.setBackupNodes(backup);
+ entry.setPrimary(channel.getLocalMember(false));
} catch (ChannelException x) {
log.error("Unable to relocate[" + entry.getKey() + "] to a
new backup node", x);
}
+ } else if (member.equals(entry.getPrimary())) {
+ entry.setPrimary(null);
} //end if
+
+ if ( entry.isProxy() &&
+ entry.getPrimary() == null &&
+ entry.getBackupNodes()!=null &&
+ entry.getBackupNodes().length == 1 &&
+ entry.getBackupNodes()[0].equals(member) ) {
+ //remove proxies that have no backup nor primaries
+ i.remove();
+ } else if ( entry.isBackup() &&
+ entry.getBackupNodes()!=null &&
+ entry.getBackupNodes().length == 1 &&
+
entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+ try {
+ entry.setPrimary(channel.getLocalMember(false));
+ entry.setBackup(false);
+ entry.setProxy(false);
+ Member[] backup = publishEntryInfo(entry.getKey(),
entry.getValue());
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a
new backup node", x);
+ }
+ }
+
} //while
}
@@ -761,7 +795,7 @@
try {
if (getMapMembers().length > 0 && notify) {
- MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null);
getChannel().send(getMapMembers(), msg,
getChannelSendOptions());
}
} catch ( ChannelException x ) {
@@ -786,7 +820,7 @@
if ( !entry.isBackup() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(),
MapMessage.MSG_RETRIEVE_BACKUP, false,
- (Serializable) key, null, null, null);
+ (Serializable) key, null, null,
null,null);
Response[] resp =
getRpcChannel().send(entry.getBackupNodes(),msg,
this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT,
getRpcTimeout());
if (resp == null || resp.length == 0) {
//no responses
@@ -807,13 +841,13 @@
backup = publishEntryInfo(key, entry.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
- msg = new
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+ msg = new
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
}
}
-
+ entry.setPrimary(channel.getLocalMember(false));
entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
@@ -875,6 +909,7 @@
MapEntry entry = new MapEntry(key,value);
entry.setBackup(false);
entry.setProxy(false);
+ entry.setPrimary(channel.getLocalMember(false));
Object old = null;
@@ -963,7 +998,7 @@
Map.Entry e = (Map.Entry)i.next();
Object key = e.getKey();
MapEntry entry = (MapEntry)super.get(key);
- if ( entry.isPrimary() ) set.add(entry.getValue());
+ if ( entry != null && entry.isPrimary() )
set.add(entry.getValue());
}
return Collections.unmodifiableSet(set);
}
@@ -977,7 +1012,7 @@
Map.Entry e = (Map.Entry)i.next();
Object key = e.getKey();
MapEntry entry = (MapEntry)super.get(key);
- if ( entry.isPrimary() ) set.add(key);
+ if ( entry!=null && entry.isPrimary() ) set.add(key);
}
return Collections.unmodifiableSet(set);
@@ -1026,7 +1061,7 @@
private boolean backup;
private boolean proxy;
private Member[] backupNodes;
-
+ private Member primary;
private Object key;
private Object value;
@@ -1080,6 +1115,14 @@
public Member[] getBackupNodes() {
return backupNodes;
}
+
+ public void setPrimary(Member m) {
+ primary = m;
+ }
+
+ public Member getPrimary() {
+ return primary;
+ }
public Object getValue() {
return value;
@@ -1172,6 +1215,7 @@
private byte[] keydata;
private byte[] diffvalue;
private Member[] nodes;
+ private Member primary;
public String toString() {
StringBuffer buf = new StringBuffer("MapMessage[context=");
@@ -1205,7 +1249,7 @@
public MapMessage(byte[] mapId,int msgtype, boolean diff,
Serializable key, Serializable value,
- byte[] diffvalue, Member[] nodes) {
+ byte[] diffvalue, Member primary, Member[] nodes) {
this.mapId = mapId;
this.msgtype = msgtype;
this.diff = diff;
@@ -1213,6 +1257,7 @@
this.value = value;
this.diffvalue = diffvalue;
this.nodes = nodes;
+ this.primary = primary;
setValue(value);
setKey(key);
}
@@ -1283,6 +1328,14 @@
private void setBackUpNodes(Member[] nodes) {
this.nodes = nodes;
}
+
+ public Member getPrimary() {
+ return primary;
+ }
+
+ private void setPrimary(Member m) {
+ primary = m;
+ }
public byte[] getMapId() {
return mapId;
@@ -1335,7 +1388,7 @@
* @return Object
*/
public Object clone() {
- MapMessage msg = new MapMessage(this.mapId, this.msgtype,
this.diff, this.key, this.value, this.diffvalue, this.nodes);
+ MapMessage msg = new MapMessage(this.mapId, this.msgtype,
this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes);
msg.keydata = this.keydata;
msg.valuedata = this.valuedata;
return msg;
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Thu Aug 9 16:30:41 2007
@@ -151,7 +151,7 @@
backup = wrap(next);
//publish the backup data to one node
msg = new MapMessage(getMapContextName(),
MapMessage.MSG_BACKUP, false,
- (Serializable) key, (Serializable) value,
null, backup);
+ (Serializable) key, (Serializable) value,
null, channel.getLocalMember(false), backup);
if ( log.isTraceEnabled() )
log.trace("Publishing backup data:"+msg+" to:
"+next.getName());
UniqueId id = getChannel().send(backup, msg,
getChannelSendOptions());
@@ -167,7 +167,7 @@
Member[] proxies = excludeFromSet(backup, getMapMembers());
if (success && proxies.length > 0 ) {
msg = new MapMessage(getMapContextName(),
MapMessage.MSG_PROXY, false,
- (Serializable) key, null, null,
backup);
+ (Serializable) key, null, null,
channel.getLocalMember(false),backup);
if ( log.isTraceEnabled() )
log.trace("Publishing proxy data:"+msg+" to:
"+Arrays.toNameString(proxies));
getChannel().send(proxies, msg, getChannelSendOptions());
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Thu Aug 9 16:30:41 2007
@@ -112,7 +112,7 @@
//publish the data out to all nodes
MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_COPY, false,
- (Serializable) key, (Serializable)
value, null, backup);
+ (Serializable) key, (Serializable)
value, null,channel.getLocalMember(false), backup);
getChannel().send(getMapMembers(), msg, getChannelSendOptions());
Modified:
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java?view=diff&rev=564422&r1=564421&r2=564422
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java
(original)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/demos/MapDemo.java Thu
Aug 9 16:30:41 2007
@@ -166,6 +166,7 @@
String[] columnNames = {
"Key",
"Value",
+ "Primary Node",
"Backup Node",
"isPrimary",
"isProxy",
@@ -198,10 +199,11 @@
switch (col) {
case 0: return entry.getKey();
case 1: return entry.getValue();
- case 2: return getMemberNames(entry.getBackupNodes());
- case 3: return new Boolean(entry.isPrimary());
- case 4: return new Boolean(entry.isProxy());
- case 5: return new Boolean(entry.isBackup());
+ case 2: return
entry.getPrimary()!=null?entry.getPrimary().getName():"null";
+ case 3: return getMemberNames(entry.getBackupNodes());
+ case 4: return new Boolean(entry.isPrimary());
+ case 5: return new Boolean(entry.isProxy());
+ case 6: return new Boolean(entry.isBackup());
default: return "";
}
@@ -408,9 +410,9 @@
cell.setBackground(Color.WHITE);
if ( row > 0 ) {
Color color = null;
- boolean primary = ( (Boolean) table.getValueAt(row,
3)).booleanValue();
- boolean proxy = ( (Boolean) table.getValueAt(row,
4)).booleanValue();
- boolean backup = ( (Boolean) table.getValueAt(row,
5)).booleanValue();
+ boolean primary = ( (Boolean) table.getValueAt(row,
4)).booleanValue();
+ boolean proxy = ( (Boolean) table.getValueAt(row,
5)).booleanValue();
+ boolean backup = ( (Boolean) table.getValueAt(row,
6)).booleanValue();
if (primary) color = Color.GREEN;
else if (proxy) color = Color.RED;
else if (backup) color = Color.BLUE;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]