Author: kfujino
Date: Thu Apr 23 08:51:18 2015
New Revision: 1675559
URL: http://svn.apache.org/r1675559
Log:
Backport r1653423, r1660266, r1671471.
- Clarify the handling of Copy message and Copy nodes.
- Make sure that add to the backup node of the map entry when map member has
been added to ReplicatedMap.
- Avoid unnecessary call of DeltaRequest#addSessionListener in non-primary
nodes.
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java
tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java
(original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java
Thu Apr 23 08:51:18 2015
@@ -209,9 +209,9 @@ public class DeltaRequest implements Ext
case TYPE_LISTENER:
SessionListener listener = (SessionListener)
info.getValue();
if (info.getAction() == ACTION_SET) {
- session.addSessionListener(listener);
+ session.addSessionListener(listener,false);
} else {
- session.removeSessionListener(listener);
+ session.removeSessionListener(listener,false);
}
break;
default :
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java
(original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java
Thu Apr 23 08:51:18 2015
@@ -515,10 +515,14 @@ public class DeltaSession extends Standa
@Override
public void addSessionListener(SessionListener listener) {
+ addSessionListener(listener, true);
+ }
+
+ public void addSessionListener(SessionListener listener, boolean
addDeltaRequest) {
lock();
try {
super.addSessionListener(listener);
- if (deltaRequest != null && listener instanceof
ReplicatedSessionListener) {
+ if (addDeltaRequest && deltaRequest != null && listener instanceof
ReplicatedSessionListener) {
deltaRequest.addSessionListener(listener);
}
} finally {
@@ -528,10 +532,14 @@ public class DeltaSession extends Standa
@Override
public void removeSessionListener(SessionListener listener) {
+ removeSessionListener(listener, true);
+ }
+
+ public void removeSessionListener(SessionListener listener, boolean
addDeltaRequest) {
lock();
try {
super.removeSessionListener(listener);
- if (deltaRequest != null && listener instanceof
ReplicatedSessionListener) {
+ if (addDeltaRequest && deltaRequest != null && listener instanceof
ReplicatedSessionListener) {
deltaRequest.removeSessionListener(listener);
}
} finally {
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
---
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
(original)
+++
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Thu Apr 23 08:51:18 2015
@@ -79,10 +79,12 @@ public abstract class AbstractReplicated
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
- private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
+ protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
protected abstract int getStateMessageType();
+ protected abstract int getReplicateMessageType();
+
/**
* Timeout for RPC messages, how long we will wait for a reply
@@ -433,7 +435,7 @@ public abstract class AbstractReplicated
rentry.lock();
try {
//construct a diff message
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ msg = new MapMessage(mapContextName,
getReplicateMessageType(),
true, (Serializable) entry.getKey(),
null,
rentry.getDiff(),
entry.getPrimary(),
@@ -447,7 +449,7 @@ public abstract class AbstractReplicated
}
if (msg == null && complete) {
//construct a complete
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ msg = new MapMessage(mapContextName, getReplicateMessageType(),
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
null,
entry.getPrimary(),entry.getBackupNodes());
@@ -639,6 +641,7 @@ public abstract class AbstractReplicated
}
entry.setProxy(true);
entry.setBackup(false);
+ entry.setCopy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
}
@@ -653,6 +656,7 @@ public abstract class AbstractReplicated
entry = new MapEntry<K,V>((K) mapmsg.getKey(), (V)
mapmsg.getValue());
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
+ entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof
ReplicatedMapEntry ) {
@@ -661,6 +665,7 @@ public abstract class AbstractReplicated
} else {
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
+ entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
if (entry.getValue() instanceof ReplicatedMapEntry) {
@@ -708,6 +713,14 @@ public abstract class AbstractReplicated
}
}
}
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) {
+ MapEntry<K, V> entry = innerMap.get(mapmsg.getKey());
+ if (entry != null) {
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
+ }
+ }
}
@Override
@@ -826,6 +839,7 @@ public abstract class AbstractReplicated
entry.setPrimary(channel.getLocalMember(false));
entry.setBackup(false);
entry.setProxy(false);
+ entry.setCopy(false);
Member[] backup = publishEntryInfo(entry.getKey(),
entry.getValue());
entry.setBackupNodes(backup);
if ( mapOwner!=null )
mapOwner.objectMadePrimay(entry.getKey(),entry.getValue());
@@ -912,7 +926,10 @@ public abstract class AbstractReplicated
try {
Member[] backup = null;
MapMessage msg = null;
- if ( !entry.isBackup() ) {
+ if (entry.isBackup()) {
+ //select a new backup node
+ backup = publishEntryInfo(key, entry.getValue());
+ } else if ( entry.isProxy() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(),
MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null,
null,null);
@@ -925,31 +942,31 @@ public abstract class AbstractReplicated
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
backup = entry.getBackupNodes();
- if ( entry.getValue() instanceof ReplicatedMapEntry ) {
- ReplicatedMapEntry val =
(ReplicatedMapEntry)entry.getValue();
- val.setOwner(getMapOwner());
- }
if ( msg.getValue()!=null ) entry.setValue((V)
msg.getValue());
- }
- if (entry.isBackup()) {
- //select a new backup node
- backup = publishEntryInfo(key, entry.getValue());
- } else if ( entry.isProxy() ) {
+
//invalidate the previous primary
msg = new
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
}
- if ( entry.getValue() != null && entry.getValue()
instanceof ReplicatedMapEntry ) {
+ if (entry.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry val =
(ReplicatedMapEntry)entry.getValue();
val.setOwner(getMapOwner());
}
+ } else if ( entry.isCopy() ) {
+ backup = getMapMembers();
+ if (backup.length > 0) {
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+
(Serializable)key,null,null,channel.getLocalMember(false),backup);
+ getChannel().send(backup, msg,
getChannelSendOptions());
+ }
}
entry.setPrimary(channel.getLocalMember(false));
entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
+ entry.setCopy(false);
if ( getMapOwner()!=null ) getMapOwner().objectMadePrimay(key,
entry.getValue());
} catch (Exception x) {
@@ -1006,6 +1023,7 @@ public abstract class AbstractReplicated
MapEntry<K,V> entry = new MapEntry<K,V>(key,value);
entry.setBackup(false);
entry.setProxy(false);
+ entry.setCopy(false);
entry.setPrimary(channel.getLocalMember(false));
V old = null;
@@ -1164,6 +1182,7 @@ public abstract class AbstractReplicated
public static class MapEntry<K,V> implements Map.Entry<K,V> {
private boolean backup;
private boolean proxy;
+ private boolean copy;
private Member[] backupNodes;
private Member primary;
private K key;
@@ -1200,7 +1219,7 @@ public abstract class AbstractReplicated
}
public boolean isPrimary() {
- return (!proxy && !backup);
+ return (!proxy && !backup && !copy);
}
public boolean isActive() {
@@ -1211,6 +1230,14 @@ public abstract class AbstractReplicated
this.proxy = proxy;
}
+ public boolean isCopy() {
+ return copy;
+ }
+
+ public void setCopy(boolean copy) {
+ this.copy = copy;
+ }
+
public boolean isDiffable() {
return (value instanceof ReplicatedMapEntry) &&
((ReplicatedMapEntry)value).isDiffable();
@@ -1322,6 +1349,7 @@ public abstract class AbstractReplicated
public static final int MSG_COPY = 9;
public static final int MSG_STATE_COPY = 10;
public static final int MSG_ACCESS = 11;
+ public static final int MSG_NOTIFY_MAPMEMBER = 12;
private byte[] mapId;
private int msgtype;
@@ -1360,6 +1388,7 @@ public abstract class AbstractReplicated
case MSG_STATE_COPY: return "MSG_STATE_COPY";
case MSG_COPY: return "MSG_COPY";
case MSG_ACCESS: return "MSG_ACCESS";
+ case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER";
default : return "UNKNOWN";
}
}
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
---
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Thu Apr 23 08:51:18 2015
@@ -125,6 +125,11 @@ public class LazyReplicatedMap<K,V> exte
return AbstractReplicatedMap.MapMessage.MSG_STATE;
}
+ @Override
+ protected int getReplicateMessageType() {
+ return AbstractReplicatedMap.MapMessage.MSG_BACKUP;
+ }
+
/**
* publish info about a map pair (key/value) to other nodes in the cluster
* @param key Object
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
---
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
(original)
+++
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Thu Apr 23 08:51:18 2015
@@ -17,10 +17,14 @@
package org.apache.catalina.tribes.tipis;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
/**
* All-to-all replication for a hash map implementation. Each node in the
cluster will carry an identical
@@ -50,6 +54,8 @@ public class ReplicatedMap<K,V> extends
private static final long serialVersionUID = 1L;
+ private final Log log = LogFactory.getLog(ReplicatedMap.class);
+
//--------------------------------------------------------------------------
// CONSTRUCTORS / DESTRUCTORS
//--------------------------------------------------------------------------
@@ -105,7 +111,12 @@ public class ReplicatedMap<K,V> extends
protected int getStateMessageType() {
return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
}
-
+
+ @Override
+ protected int getReplicateMessageType() {
+ return AbstractReplicatedMap.MapMessage.MSG_COPY;
+ }
+
/**
* publish info about a map pair (key/value) to other nodes in the cluster
* @param key Object
@@ -130,4 +141,94 @@ public class ReplicatedMap<K,V> extends
return backup;
}
+ @Override
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null );
+ if (!removed) {
+ if (log.isDebugEnabled()) log.debug("Member["+member+"]
disappeared, but was not present in the map.");
+ return; //the member was not part of our map.
+ }
+ }
+ if (log.isInfoEnabled())
+ log.info("Member["+member+"] disappeared. Related map entries will
be relocated to the new node.");
+ long start = System.currentTimeMillis();
+ Iterator<Map.Entry<K,MapEntry<K,V>>> i =
innerMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<K,MapEntry<K,V>> e = i.next();
+ MapEntry<K,V> entry = innerMap.get(e.getKey());
+ if (entry==null) continue;
+ if (entry.isPrimary()) {
+ try {
+ Member[] backup = getMapMembers();
+ if (backup.length > 0) {
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+
(Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+ getChannel().send(backup, msg,
getChannelSendOptions());
+ }
+ entry.setBackupNodes(backup);
+ entry.setPrimary(channel.getLocalMember(false));
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a
new backup node", x);
+ }
+ } else if (member.equals(entry.getPrimary())) {
+ entry.setPrimary(null);
+ }
+
+ if ( entry.getPrimary() == null &&
+ entry.isCopy() &&
+ entry.getBackupNodes()!=null &&
+ entry.getBackupNodes().length > 0 &&
+
entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+ try {
+ entry.setPrimary(channel.getLocalMember(false));
+ entry.setBackup(false);
+ entry.setProxy(false);
+ entry.setCopy(false);
+ Member[] backup = getMapMembers();
+ if (backup.length > 0) {
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+
(Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+ getChannel().send(backup, msg,
getChannelSendOptions());
+ }
+ entry.setBackupNodes(backup);
+ if ( mapOwner!=null )
mapOwner.objectMadePrimay(entry.getKey(),entry.getValue());
+
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a
new backup node", x);
+ }
+ }
+
+ } //while
+ long complete = System.currentTimeMillis() - start;
+ if (log.isInfoEnabled()) log.info("Relocation of map entries was
complete in " + complete + " ms.");
+ }
+
+ @Override
+ public void mapMemberAdded(Member member) {
+ if ( member.equals(getChannel().getLocalMember(false)) ) return;
+ boolean memberAdded = false;
+ synchronized (mapMembers) {
+ if (!mapMembers.containsKey(member) ) {
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
+ memberAdded = true;
+ }
+ }
+ if ( memberAdded ) {
+ synchronized (stateMutex) {
+ Member[] backup = getMapMembers();
+ Iterator<Map.Entry<K,MapEntry<K,V>>> i =
innerMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<K,MapEntry<K,V>> e = i.next();
+ MapEntry<K,V> entry = innerMap.get(e.getKey());
+ if ( entry == null ) continue;
+ if (entry.isPrimary() &&
!inSet(member,entry.getBackupNodes())) {
+ entry.setBackupNodes(backup);
+ }
+ }
+ }
+ }
+ }
+
}
\ No newline at end of file
Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Thu Apr 23 08:51:18 2015
@@ -136,6 +136,10 @@
to handle nodes being added and removed from the Cluster at run time.
(markt)
</fix>
+ <fix>
+ Avoid unnecessary call of
<code>DeltaRequest.addSessionListener()</code>
+ in non-primary nodes. (kfujino)
+ </fix>
</changelog>
</subsection>
<subsection name="WebSocket">
@@ -179,6 +183,26 @@
This fix ensures that <code>MapOwner</code> is set to
<code>ReplicatedMapEntry</code>. (kfujino)
</fix>
+ <fix>
+ Clarify the handling of Copy message and Copy nodes. (kfujino)
+ </fix>
+ <fix>
+ Copy node does not need to send the entry data. It is enough to send
+ only the node information of the entry. (kfujino)
+ </fix>
+ <fix>
+ <code>ReplicatedMap</code> should send the Copy message when
+ replicating. (kfujino)
+ </fix>
+ <fix>
+ Fix behavior of <code>ReplicatedMap</code> when member has disappeared.
+ If map entry is primary, rebuild the backup members. If primary node of
+ map entry has disappeared, backup node is promoted to primary.
(kfujino)
+ </fix>
+ <fix>
+ When a map member has been added to <code>ReplicatedMap</code>, make
+ sure to add it to backup nodes list of all other members. (kfujino)
+ </fix>
</changelog>
</subsection>
</section>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]