This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push:
new 273a3d1c78 Code clean-up - no functional change
273a3d1c78 is described below
commit 273a3d1c7848adf97ecb1c790be7700f49f9bb71
Author: Mark Thomas <[email protected]>
AuthorDate: Fri May 10 14:39:40 2024 +0100
Code clean-up - no functional change
---
.../apache/catalina/tribes/jmx/JmxRegistry.java | 8 +-
.../tribes/tipis/AbstractReplicatedMap.java | 690 ++++++++++-----------
.../catalina/tribes/tipis/LazyReplicatedMap.java | 170 ++---
.../catalina/tribes/tipis/ReplicatedMap.java | 157 ++---
.../catalina/tribes/tipis/ReplicatedMapEntry.java | 34 +-
5 files changed, 539 insertions(+), 520 deletions(-)
diff --git a/java/org/apache/catalina/tribes/jmx/JmxRegistry.java
b/java/org/apache/catalina/tribes/jmx/JmxRegistry.java
index a22904d963..a833597fbe 100644
--- a/java/org/apache/catalina/tribes/jmx/JmxRegistry.java
+++ b/java/org/apache/catalina/tribes/jmx/JmxRegistry.java
@@ -36,7 +36,7 @@ public class JmxRegistry {
private static final Log log = LogFactory.getLog(JmxRegistry.class);
protected static final StringManager sm =
StringManager.getManager(JmxRegistry.class);
- private static ConcurrentHashMap<String, JmxRegistry> registryCache = new
ConcurrentHashMap<>();
+ private static ConcurrentHashMap<String,JmxRegistry> registryCache = new
ConcurrentHashMap<>();
private MBeanServer mbserver = ManagementFactory.getPlatformMBeanServer();
private ObjectName baseOname = null;
@@ -60,8 +60,8 @@ public class JmxRegistry {
if (!jmxChannel.isJmxEnabled()) {
return null;
}
- ObjectName baseOn = createBaseObjectName(jmxChannel.getJmxDomain(),
- jmxChannel.getJmxPrefix(), channel.getName());
+ ObjectName baseOn =
+ createBaseObjectName(jmxChannel.getJmxDomain(),
jmxChannel.getJmxPrefix(), channel.getName());
if (baseOn == null) {
return null;
}
@@ -131,7 +131,7 @@ public class JmxRegistry {
}
public void unregisterJmx(ObjectName oname) {
- if (oname ==null) {
+ if (oname == null) {
return;
}
try {
diff --git a/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
b/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
index 01b133d660..350d2436ac 100644
--- a/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
+++ b/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
@@ -52,9 +52,8 @@ import org.apache.juli.logging.LogFactory;
* @param <K> The type of Key
* @param <V> The type of Value
*/
-public abstract class AbstractReplicatedMap<K,V>
- implements Map<K,V>, Serializable, RpcCallback, ChannelListener,
- MembershipListener, Heartbeat {
+public abstract class AbstractReplicatedMap<K, V>
+ implements Map<K,V>, Serializable, RpcCallback, ChannelListener,
MembershipListener, Heartbeat {
private static final long serialVersionUID = 1L;
@@ -73,10 +72,10 @@ public abstract class AbstractReplicatedMap<K,V>
public static final float DEFAULT_LOAD_FACTOR = 0.75f;
-//------------------------------------------------------------------------------
-// INSTANCE VARIABLES
-//------------------------------------------------------------------------------
- protected final ConcurrentMap<K, MapEntry<K,V>> innerMap;
+ //
------------------------------------------------------------------------------
+ // INSTANCE VARIABLES
+ //
------------------------------------------------------------------------------
+ protected final ConcurrentMap<K,MapEntry<K,V>> innerMap;
protected abstract int getStateMessageType();
@@ -96,9 +95,7 @@ public abstract class AbstractReplicatedMap<K,V>
*/
protected transient RpcChannel rpcChannel;
/**
- * The Map context name makes this map unique, this
- * allows us to have more than one map shared
- * through one channel
+ * The Map context name makes this map unique, this allows us to have more
than one map shared through one channel
*/
protected transient byte[] mapContextName;
/**
@@ -112,7 +109,7 @@ public abstract class AbstractReplicatedMap<K,V>
/**
* A list of members in our map
*/
- protected final transient HashMap<Member, Long> mapMembers = new
HashMap<>();
+ protected final transient HashMap<Member,Long> mapMembers = new
HashMap<>();
/**
* Our default send options
*/
@@ -127,16 +124,13 @@ public abstract class AbstractReplicatedMap<K,V>
protected transient ClassLoader[] externalLoaders;
/**
- * The node we are currently backing up data to, this index will rotate
- * on a round robin basis
+ * The node we are currently backing up data to, this index will rotate on
a round robin basis
*/
protected transient int currentNode = 0;
/**
- * Since the map keeps internal membership
- * this is the timeout for a ping message to be responded to
- * If a remote map doesn't respond within this timeframe,
- * its considered dead.
+ * Since the map keeps internal membership this is the timeout for a ping
message to be responded to If a remote map
+ * doesn't respond within this timeframe, its considered dead.
*/
protected transient long accessTimeout = 5000;
@@ -150,39 +144,33 @@ public abstract class AbstractReplicatedMap<K,V>
*/
private transient volatile State state = State.NEW;
-//------------------------------------------------------------------------------
-// map owner interface
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // map owner interface
+ //
------------------------------------------------------------------------------
public interface MapOwner {
void objectMadePrimary(Object key, Object value);
}
-//------------------------------------------------------------------------------
-// CONSTRUCTORS
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // CONSTRUCTORS
+ //
------------------------------------------------------------------------------
/**
* Creates a new map.
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
- * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
+ * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
* @param channelSendOptions Send options
- * @param cls - a list of classloaders to be used for deserialization of
objects.
- * @param terminate - Flag for whether to terminate this map that failed
to start.
+ * @param cls - a list of classloaders to be used for
deserialization of objects.
+ * @param terminate - Flag for whether to terminate this map that
failed to start.
*/
- public AbstractReplicatedMap(MapOwner owner,
- Channel channel,
- long timeout,
- String mapContextName,
- int initialCapacity,
- float loadFactor,
- int channelSendOptions,
- ClassLoader[] cls,
- boolean terminate) {
+ public AbstractReplicatedMap(MapOwner owner, Channel channel, long
timeout, String mapContextName,
+ int initialCapacity, float loadFactor, int channelSendOptions,
ClassLoader[] cls, boolean terminate) {
innerMap = new ConcurrentHashMap<>(initialCapacity, loadFactor, 15);
init(owner, channel, mapContextName, timeout, channelSendOptions, cls,
terminate);
@@ -190,30 +178,33 @@ public abstract class AbstractReplicatedMap<K,V>
/**
* Helper methods, wraps a single member in an array
+ *
* @param m Member
+ *
* @return Member[]
*/
protected Member[] wrap(Member m) {
- if ( m == null ) {
+ if (m == null) {
return new Member[0];
} else {
- return new Member[] {m};
+ return new Member[] { m };
}
}
/**
- * Initializes the map by creating the RPC channel, registering itself as
a channel listener
- * This method is also responsible for initiating the state transfer
- * @param owner Object
- * @param channel Channel
- * @param mapContextName String
- * @param timeout long
+ * Initializes the map by creating the RPC channel, registering itself as
a channel listener This method is also
+ * responsible for initiating the state transfer
+ *
+ * @param owner Object
+ * @param channel Channel
+ * @param mapContextName String
+ * @param timeout long
* @param channelSendOptions int
- * @param cls ClassLoader[]
- * @param terminate - Flag for whether to terminate this map that failed
to start.
+ * @param cls ClassLoader[]
+ * @param terminate - Flag for whether to terminate this map that
failed to start.
*/
- protected void init(MapOwner owner, Channel channel, String mapContextName,
- long timeout, int channelSendOptions,ClassLoader[] cls, boolean
terminate) {
+ protected void init(MapOwner owner, Channel channel, String
mapContextName, long timeout, int channelSendOptions,
+ ClassLoader[] cls, boolean terminate) {
long start = System.currentTimeMillis();
if (log.isInfoEnabled()) {
log.info(sm.getString("abstractReplicatedMap.init.start",
mapContextName));
@@ -225,64 +216,57 @@ public abstract class AbstractReplicatedMap<K,V>
this.rpcTimeout = timeout;
this.mapname = mapContextName;
- //unique context is more efficient if it is stored as bytes
+ // unique context is more efficient if it is stored as bytes
this.mapContextName =
mapContextName.getBytes(StandardCharsets.ISO_8859_1);
- if ( log.isTraceEnabled() ) {
- log.trace("Created Lazy Map with name:"+mapContextName+",
bytes:"+Arrays.toString(this.mapContextName));
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Created Lazy Map with name:" + mapContextName + ",
bytes:" + Arrays.toString(this.mapContextName));
}
- //create an rpc channel and add the map as a listener
+ // create an rpc channel and add the map as a listener
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
- //add this map as a message listener
+ // add this map as a message listener
this.channel.addChannelListener(this);
- //listen for membership notifications
+ // listen for membership notifications
this.channel.addMembershipListener(this);
try {
- //broadcast our map, this just notifies other members of our
existence
+ // broadcast our map, this just notifies other members of our
existence
broadcast(MapMessage.MSG_INIT, true);
- //transfer state from another map
+ // transfer state from another map
transferState();
- //state is transferred, we are ready for messaging
+ // state is transferred, we are ready for messaging
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn(sm.getString("abstractReplicatedMap.unableSend.startMessage"));
if (terminate) {
breakdown();
- throw new
RuntimeException(sm.getString("abstractReplicatedMap.unableStart"),x);
+ throw new
RuntimeException(sm.getString("abstractReplicatedMap.unableStart"), x);
}
}
this.state = State.INITIALIZED;
long complete = System.currentTimeMillis() - start;
if (log.isInfoEnabled()) {
- log.info(sm.getString("abstractReplicatedMap.init.completed",
- mapContextName, Long.toString(complete)));
+ log.info(sm.getString("abstractReplicatedMap.init.completed",
mapContextName, Long.toString(complete)));
}
}
/**
- * Sends a ping out to all the members in the cluster, not just map members
- * that this map is alive.
+ * Sends a ping out to all the members in the cluster, not just map
members that this map is alive.
+ *
* @param timeout long
+ *
* @throws ChannelException Send error
*/
protected void ping(long timeout) throws ChannelException {
- MapMessage msg = new MapMessage(this.mapContextName,
- MapMessage.MSG_PING,
- false,
- null,
- null,
- null,
- channel.getLocalMember(false),
- null);
- if ( channel.getMembers().length > 0 ) {
+ MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_PING, false, null, null, null,
+ channel.getLocalMember(false), null);
+ if (channel.getMembers().length > 0) {
try {
- //send a ping, wait for all nodes to reply
- Response[] resp = rpcChannel.send(channel.getMembers(),
- msg, RpcChannel.ALL_REPLY,
- (channelSendOptions),
- (int) accessTimeout);
+ // send a ping, wait for all nodes to reply
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg,
RpcChannel.ALL_REPLY, (channelSendOptions),
+ (int) accessTimeout);
for (Response response : resp) {
MapMessage mapMsg = (MapMessage) response.getMessage();
try {
@@ -294,8 +278,7 @@ public abstract class AbstractReplicatedMap<K,V>
} else if (state == State.STATETRANSFERRED) {
synchronized (mapMembers) {
if (log.isInfoEnabled()) {
-
log.info(sm.getString("abstractReplicatedMap.ping.stateTransferredMember",
- member));
+
log.info(sm.getString("abstractReplicatedMap.ping.stateTransferredMember",
member));
}
if (mapMembers.containsKey(member)) {
mapMembers.put(member,
Long.valueOf(System.currentTimeMillis()));
@@ -303,8 +286,7 @@ public abstract class AbstractReplicatedMap<K,V>
}
} else {
if (log.isInfoEnabled()) {
-
log.info(sm.getString("abstractReplicatedMap.mapMember.unavailable",
- member));
+
log.info(sm.getString("abstractReplicatedMap.mapMember.unavailable", member));
}
}
} catch (ClassNotFoundException | IOException e) {
@@ -320,22 +302,23 @@ public abstract class AbstractReplicatedMap<K,V>
throw ce;
}
}
- //update our map of members, expire some if we didn't receive a ping
back
+ // update our map of members, expire some if we didn't receive a ping
back
synchronized (mapMembers) {
Member[] members = mapMembers.keySet().toArray(new Member[0]);
long now = System.currentTimeMillis();
for (Member member : members) {
long access = mapMembers.get(member).longValue();
- if ( (now - access) > timeout ) {
+ if ((now - access) > timeout) {
log.warn(sm.getString("abstractReplicatedMap.ping.timeout", member, mapname));
memberDisappeared(member);
}
}
- }//synch
+ } // synch
}
/**
* We have received a member alive notification
+ *
* @param member Member
*/
protected void memberAlive(Member member) {
@@ -347,22 +330,23 @@ public abstract class AbstractReplicatedMap<K,V>
/**
* Helper method to broadcast a message to all members in a channel
+ *
* @param msgtype int
- * @param rpc boolean
+ * @param rpc boolean
+ *
* @throws ChannelException Send error
*/
protected void broadcast(int msgtype, boolean rpc) throws ChannelException
{
Member[] members = channel.getMembers();
// No destination.
- if (members.length == 0 ) {
+ if (members.length == 0) {
return;
}
- //send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName, msgtype,
- false, null, null, null,
channel.getLocalMember(false), null);
- if ( rpc) {
- Response[] resp = rpcChannel.send(members, msg,
- RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout);
+ // send out a map membership message, only wait for the first reply
+ MapMessage msg = new MapMessage(this.mapContextName, msgtype, false,
null, null, null,
+ channel.getLocalMember(false), null);
+ if (rpc) {
+ Response[] resp = rpcChannel.send(members, msg,
RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout);
if (resp.length > 0) {
for (Response response : resp) {
mapMemberAdded(response.getSource());
@@ -372,7 +356,7 @@ public abstract class AbstractReplicatedMap<K,V>
log.warn(sm.getString("abstractReplicatedMap.broadcast.noReplies"));
}
} else {
- channel.send(channel.getMembers(),msg,channelSendOptions);
+ channel.send(channel.getMembers(), msg, channelSendOptions);
}
}
@@ -382,8 +366,11 @@ public abstract class AbstractReplicatedMap<K,V>
this.rpcChannel.breakdown();
}
if (this.channel != null) {
- try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception
ignore){}
- //cleanup
+ try {
+ broadcast(MapMessage.MSG_STOP, false);
+ } catch (Exception ignore) {
+ }
+ // cleanup
this.channel.removeChannelListener(this);
this.channel.removeMembershipListener(this);
}
@@ -404,23 +391,24 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public boolean equals(Object o) {
- if ( !(o instanceof AbstractReplicatedMap)) {
+ if (!(o instanceof AbstractReplicatedMap)) {
return false;
}
- if ( !(o.getClass().equals(this.getClass())) ) {
+ if (!(o.getClass().equals(this.getClass()))) {
return false;
}
@SuppressWarnings("unchecked")
- AbstractReplicatedMap<K,V> other = (AbstractReplicatedMap<K,V>)o;
- return Arrays.equals(mapContextName,other.mapContextName);
+ AbstractReplicatedMap<K,V> other = (AbstractReplicatedMap<K,V>) o;
+ return Arrays.equals(mapContextName, other.mapContextName);
}
-//------------------------------------------------------------------------------
-// GROUP COM INTERFACES
-//------------------------------------------------------------------------------
- public Member[] getMapMembers(HashMap<Member, Long> members) {
+ //
------------------------------------------------------------------------------
+ // GROUP COM INTERFACES
+ //
------------------------------------------------------------------------------
+ public Member[] getMapMembers(HashMap<Member,Long> members) {
return members.keySet().toArray(new Member[0]);
}
+
public Member[] getMapMembers() {
synchronized (mapMembers) {
return getMapMembers(mapMembers);
@@ -433,7 +421,7 @@ public abstract class AbstractReplicatedMap<K,V>
}
synchronized (mapMembers) {
@SuppressWarnings("unchecked") // mapMembers has the correct type
- HashMap<Member, Long> list = (HashMap<Member,
Long>)mapMembers.clone();
+ HashMap<Member,Long> list = (HashMap<Member,Long>)
mapMembers.clone();
for (Member member : exclude) {
list.remove(member);
}
@@ -443,52 +431,49 @@ public abstract class AbstractReplicatedMap<K,V>
/**
- * Replicates any changes to the object since the last time
- * The object has to be primary, ie, if the object is a proxy or a backup,
it will not be replicated<br>
- * @param key The object to replicate
- * @param complete - if set to true, the object is replicated to its backup
- * if set to false, only objects that implement ReplicatedMapEntry and the
isDirty() returns true will
- * be replicated
+ * Replicates any changes to the object since the last time The object has
to be primary, ie, if the object is a
+ * proxy or a backup, it will not be replicated<br>
+ *
+ * @param key The object to replicate
+ * @param complete - if set to true, the object is replicated to its
backup if set to false, only objects that
+ * implement ReplicatedMapEntry and the isDirty()
returns true will be replicated
*/
public void replicate(Object key, boolean complete) {
- if ( log.isTraceEnabled() ) {
- log.trace("Replicate invoked on key:"+key);
+ if (log.isTraceEnabled()) {
+ log.trace("Replicate invoked on key:" + key);
}
MapEntry<K,V> entry = innerMap.get(key);
- if ( entry == null ) {
+ if (entry == null) {
return;
}
- if ( !entry.isSerializable() ) {
+ if (!entry.isSerializable()) {
return;
}
- if (entry.isPrimary() && entry.getBackupNodes()!= null &&
entry.getBackupNodes().length > 0) {
- //check to see if we need to replicate this object
isDirty()||complete || isAccessReplicate()
+ if (entry.isPrimary() && entry.getBackupNodes() != null &&
entry.getBackupNodes().length > 0) {
+ // check to see if we need to replicate this object
isDirty()||complete || isAccessReplicate()
ReplicatedMapEntry rentry = null;
if (entry.getValue() instanceof ReplicatedMapEntry) {
- rentry = (ReplicatedMapEntry)entry.getValue();
+ rentry = (ReplicatedMapEntry) entry.getValue();
}
boolean isDirty = rentry != null && rentry.isDirty();
boolean isAccess = rentry != null && rentry.isAccessReplicate();
boolean repl = complete || isDirty || isAccess;
if (!repl) {
- if ( log.isTraceEnabled() ) {
- log.trace("Not replicating:"+key+", no change made");
+ if (log.isTraceEnabled()) {
+ log.trace("Not replicating:" + key + ", no change made");
}
return;
}
- //check to see if the message is diffable
+ // check to see if the message is diffable
MapMessage msg = null;
if (rentry != null && rentry.isDiffable() && (isDirty ||
complete)) {
rentry.lock();
try {
- //construct a diff message
- msg = new MapMessage(mapContextName,
getReplicateMessageType(),
- true, (Serializable) entry.getKey(),
null,
- rentry.getDiff(),
- entry.getPrimary(),
- entry.getBackupNodes());
+ // construct a diff message
+ msg = new MapMessage(mapContextName,
getReplicateMessageType(), true, (Serializable) entry.getKey(),
+ null, rentry.getDiff(), entry.getPrimary(),
entry.getBackupNodes());
rentry.resetDiff();
} catch (IOException x) {
log.error(sm.getString("abstractReplicatedMap.unable.diffObject"), x);
@@ -497,20 +482,17 @@ public abstract class AbstractReplicatedMap<K,V>
}
}
if (msg == null && complete) {
- //construct a complete
- msg = new MapMessage(mapContextName, getReplicateMessageType(),
- false, (Serializable) entry.getKey(),
- (Serializable) entry.getValue(),
- null,
entry.getPrimary(),entry.getBackupNodes());
+ // construct a complete
+ msg = new MapMessage(mapContextName,
getReplicateMessageType(), false, (Serializable) entry.getKey(),
+ (Serializable) entry.getValue(), null,
entry.getPrimary(), entry.getBackupNodes());
}
if (msg == null) {
- //construct a access message
- msg = new MapMessage(mapContextName, MapMessage.MSG_ACCESS,
- false, (Serializable) entry.getKey(), null, null,
entry.getPrimary(),
- entry.getBackupNodes());
+ // construct a access message
+ msg = new MapMessage(mapContextName, MapMessage.MSG_ACCESS,
false, (Serializable) entry.getKey(), null,
+ null, entry.getPrimary(), entry.getBackupNodes());
}
try {
- if ( channel!=null && entry.getBackupNodes()!= null &&
entry.getBackupNodes().length > 0 ) {
+ if (channel != null && entry.getBackupNodes() != null &&
entry.getBackupNodes().length > 0) {
if (rentry != null) {
rentry.setLastTimeReplicated(System.currentTimeMillis());
}
@@ -519,18 +501,18 @@ public abstract class AbstractReplicatedMap<K,V>
} catch (ChannelException x) {
log.error(sm.getString("abstractReplicatedMap.unable.replicate"), x);
}
- } //end if
+ } // end if
}
/**
- * This can be invoked by a periodic thread to replicate out any changes.
- * For maps that don't store objects that implement ReplicatedMapEntry,
this
- * method should be used infrequently to avoid large amounts of data
transfer
+ * This can be invoked by a periodic thread to replicate out any changes.
For maps that don't store objects that
+ * implement ReplicatedMapEntry, this method should be used infrequently
to avoid large amounts of data transfer
+ *
* @param complete boolean
*/
public void replicate(boolean complete) {
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
replicate(e.getKey(), complete);
}
}
@@ -540,9 +522,10 @@ public abstract class AbstractReplicatedMap<K,V>
Member[] members = getMapMembers();
Member backup = members.length > 0 ? members[0] : null;
if (backup != null) {
- MapMessage msg = new MapMessage(mapContextName,
getStateMessageType(), false,
- null, null, null, null, null);
- Response[] resp = rpcChannel.send(new Member[] {backup}, msg,
RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
+ MapMessage msg =
+ new MapMessage(mapContextName, getStateMessageType(),
false, null, null, null, null, null);
+ Response[] resp = rpcChannel.send(new Member[] { backup },
msg, RpcChannel.FIRST_REPLY,
+ channelSendOptions, rpcTimeout);
if (resp.length > 0) {
synchronized (stateMutex) {
msg = (MapMessage) resp[0].getMessage();
@@ -550,7 +533,7 @@ public abstract class AbstractReplicatedMap<K,V>
ArrayList<?> list = (ArrayList<?>) msg.getValue();
for (Object o : list) {
messageReceived((Serializable) o,
resp[0].getSource());
- } //for
+ } // for
}
stateTransferred = true;
} else {
@@ -565,52 +548,53 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public Serializable replyRequest(Serializable msg, final Member sender) {
- if (! (msg instanceof MapMessage)) {
+ if (!(msg instanceof MapMessage)) {
return null;
}
MapMessage mapmsg = (MapMessage) msg;
- //map init request
+ // map init request
if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
mapmsg.setPrimary(channel.getLocalMember(false));
return mapmsg;
}
- //map start request
+ // map start request
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapmsg.setPrimary(channel.getLocalMember(false));
mapMemberAdded(sender);
return mapmsg;
}
- //backup request
+ // backup request
if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) {
MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
- if (entry == null || (!entry.isSerializable()) ) {
+ if (entry == null || (!entry.isSerializable())) {
return null;
}
- mapmsg.setValue( (Serializable) entry.getValue());
+ mapmsg.setValue((Serializable) entry.getValue());
return mapmsg;
}
- //state transfer request
+ // state transfer request
if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType()
== MapMessage.MSG_STATE_COPY) {
- synchronized (stateMutex) { //make sure we don't do two things at
the same time
+ synchronized (stateMutex) { // make sure we don't do two things at
the same time
ArrayList<MapMessage> list = new ArrayList<>();
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if ( entry != null && entry.isSerializable() ) {
+ if (entry != null && entry.isSerializable()) {
boolean copy = (mapmsg.getMsgType() ==
MapMessage.MSG_STATE_COPY);
- MapMessage me = new MapMessage(mapContextName,
-
copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(),
copy?(Serializable) entry.getValue():null, null,
entry.getPrimary(),entry.getBackupNodes());
+ MapMessage me =
+ new MapMessage(mapContextName, copy ?
MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false,
+ (Serializable) entry.getKey(), copy ?
(Serializable) entry.getValue() : null,
+ null, entry.getPrimary(),
entry.getBackupNodes());
list.add(me);
}
}
mapmsg.setValue(list);
return mapmsg;
- } //synchronized
+ } // synchronized
}
// ping
@@ -626,8 +610,8 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public void leftOver(Serializable msg, Member sender) {
- //left over membership messages
- if (! (msg instanceof MapMessage)) {
+ // left over membership messages
+ if (!(msg instanceof MapMessage)) {
return;
}
@@ -650,25 +634,24 @@ public abstract class AbstractReplicatedMap<K,V>
} else {
// other messages are ignored.
if (log.isInfoEnabled()) {
-
log.info(sm.getString("abstractReplicatedMap.leftOver.ignored",
- mapmsg.getTypeDesc()));
+
log.info(sm.getString("abstractReplicatedMap.leftOver.ignored",
mapmsg.getTypeDesc()));
}
}
} catch (IOException | ClassNotFoundException x) {
-
log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"),x);
+
log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"),
x);
}
}
@SuppressWarnings("unchecked")
@Override
public void messageReceived(Serializable msg, Member sender) {
- if (! (msg instanceof MapMessage)) {
+ if (!(msg instanceof MapMessage)) {
return;
}
MapMessage mapmsg = (MapMessage) msg;
- if ( log.isTraceEnabled() ) {
- log.trace("Map["+mapname+"] received message:"+mapmsg);
+ if (log.isTraceEnabled()) {
+ log.trace("Map[" + mapname + "] received message:" + mapmsg);
}
try {
@@ -677,8 +660,8 @@ public abstract class AbstractReplicatedMap<K,V>
log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"),
x);
return;
}
- if ( log.isTraceEnabled() ) {
- log.trace("Map message received from:"+sender.getName()+"
msg:"+mapmsg);
+ if (log.isTraceEnabled()) {
+ log.trace("Map message received from:" + sender.getName() + "
msg:" + mapmsg);
}
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getPrimary());
@@ -690,7 +673,7 @@ public abstract class AbstractReplicatedMap<K,V>
if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
- if ( entry==null ) {
+ if (entry == null) {
entry = new MapEntry<>((K) mapmsg.getKey(), (V)
mapmsg.getValue());
MapEntry<K,V> old = innerMap.putIfAbsent(entry.getKey(),
entry);
if (old != null) {
@@ -717,8 +700,8 @@ public abstract class AbstractReplicatedMap<K,V>
entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
- if (mapmsg.getValue() instanceof ReplicatedMapEntry ) {
-
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
+ if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
+ ((ReplicatedMapEntry)
mapmsg.getValue()).setOwner(getMapOwner());
}
} else {
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
@@ -738,33 +721,33 @@ public abstract class AbstractReplicatedMap<K,V>
diff.unlock();
}
} else {
- if ( mapmsg.getValue()!=null ) {
+ if (mapmsg.getValue() != null) {
if (mapmsg.getValue() instanceof
ReplicatedMapEntry) {
- ReplicatedMapEntry re =
(ReplicatedMapEntry)mapmsg.getValue();
+ ReplicatedMapEntry re = (ReplicatedMapEntry)
mapmsg.getValue();
re.setOwner(getMapOwner());
entry.setValue((V) re);
} else {
entry.setValue((V) mapmsg.getValue());
}
} else {
-
((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
+ ((ReplicatedMapEntry)
entry.getValue()).setOwner(getMapOwner());
}
- } //end if
- } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry re =
(ReplicatedMapEntry)mapmsg.getValue();
+ } // end if
+ } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry re = (ReplicatedMapEntry)
mapmsg.getValue();
re.setOwner(getMapOwner());
entry.setValue((V) re);
} else {
- if ( mapmsg.getValue()!=null ) {
+ if (mapmsg.getValue() != null) {
entry.setValue((V) mapmsg.getValue());
}
- } //end if
- } //end if
+ } // end if
+ } // end if
innerMap.put(entry.getKey(), entry);
- } //end if
+ } // end if
if (mapmsg.getMsgType() == MapMessage.MSG_ACCESS) {
- MapEntry<K, V> entry = innerMap.get(mapmsg.getKey());
+ MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
if (entry != null) {
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
@@ -775,7 +758,7 @@ public abstract class AbstractReplicatedMap<K,V>
}
if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) {
- MapEntry<K, V> entry = innerMap.get(mapmsg.getKey());
+ MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
if (entry != null) {
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
@@ -790,30 +773,30 @@ public abstract class AbstractReplicatedMap<K,V>
public boolean accept(Serializable msg, Member sender) {
boolean result = false;
if (msg instanceof MapMessage) {
- if ( log.isTraceEnabled() ) {
- log.trace("Map["+mapname+"] accepting...."+msg);
+ if (log.isTraceEnabled()) {
+ log.trace("Map[" + mapname + "] accepting...." + msg);
}
- result = Arrays.equals(mapContextName, ( (MapMessage)
msg).getMapId());
- if ( log.isTraceEnabled() ) {
- log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
+ result = Arrays.equals(mapContextName, ((MapMessage)
msg).getMapId());
+ if (log.isTraceEnabled()) {
+ log.trace("Msg[" + mapname + "] accepted[" + result + "]...."
+ msg);
}
}
return result;
}
public void mapMemberAdded(Member member) {
- if ( member.equals(getChannel().getLocalMember(false)) ) {
+ if (member.equals(getChannel().getLocalMember(false))) {
return;
}
boolean memberAdded = false;
- //select a backup node if we don't have one
+ // select a backup node if we don't have one
Member mapMember = getChannel().getMember(member);
if (mapMember == null) {
log.warn(sm.getString("abstractReplicatedMap.mapMemberAdded.nullMember",
member));
return;
}
synchronized (mapMembers) {
- if (!mapMembers.containsKey(mapMember) ) {
+ if (!mapMembers.containsKey(mapMember)) {
if (log.isInfoEnabled()) {
log.info(sm.getString("abstractReplicatedMap.mapMemberAdded.added", mapMember));
}
@@ -821,11 +804,11 @@ public abstract class AbstractReplicatedMap<K,V>
memberAdded = true;
}
}
- if ( memberAdded ) {
+ if (memberAdded) {
synchronized (stateMutex) {
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if ( entry == null ) {
+ if (entry == null) {
continue;
}
if (entry.isPrimary() && (entry.getBackupNodes() == null
|| entry.getBackupNodes().length == 0)) {
@@ -835,15 +818,15 @@ public abstract class AbstractReplicatedMap<K,V>
entry.setPrimary(channel.getLocalMember(false));
} catch (ChannelException x) {
log.error(sm.getString("abstractReplicatedMap.unableSelect.backup"), x);
- } //catch
- } //end if
- } //while
- } //synchronized
- }//end if
+ } // catch
+ } // end if
+ } // while
+ } // synchronized
+ } // end if
}
public boolean inSet(Member m, Member[] set) {
- if ( set == null ) {
+ if (set == null) {
return false;
}
boolean result = false;
@@ -875,19 +858,19 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public void memberAdded(Member member) {
- //do nothing
+ // do nothing
}
@Override
public void memberDisappeared(Member member) {
boolean removed = false;
synchronized (mapMembers) {
- removed = (mapMembers.remove(member) != null );
+ removed = (mapMembers.remove(member) != null);
if (!removed) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("replicatedMap.member.disappeared.unknown", member));
}
- return; //the member was not part of our map.
+ return; // the member was not part of our map.
}
}
if (log.isInfoEnabled()) {
@@ -898,10 +881,10 @@ public abstract class AbstractReplicatedMap<K,V>
while (i.hasNext()) {
Map.Entry<K,MapEntry<K,V>> e = i.next();
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if (entry==null) {
+ if (entry == null) {
continue;
}
- if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
+ if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("abstractReplicatedMap.newBackup"));
}
@@ -917,23 +900,18 @@ public abstract class AbstractReplicatedMap<K,V>
log.debug(sm.getString("abstractReplicatedMap.primaryDisappeared"));
}
entry.setPrimary(null);
- } //end if
-
- if ( entry.isProxy() &&
- entry.getPrimary() == null &&
- entry.getBackupNodes()!=null &&
- entry.getBackupNodes().length == 1 &&
- entry.getBackupNodes()[0].equals(member) ) {
- //remove proxies that have no backup nor primaries
+ } // end if
+
+ if (entry.isProxy() && entry.getPrimary() == null &&
entry.getBackupNodes() != null &&
+ entry.getBackupNodes().length == 1 &&
entry.getBackupNodes()[0].equals(member)) {
+ // remove proxies that have no backup nor primaries
if (log.isDebugEnabled()) {
log.debug(sm.getString("abstractReplicatedMap.removeOrphan"));
}
i.remove();
- } else if ( entry.getPrimary() == null &&
- entry.isBackup() &&
- entry.getBackupNodes()!=null &&
- entry.getBackupNodes().length == 1 &&
-
entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+ } else if (entry.getPrimary() == null && entry.isBackup() &&
entry.getBackupNodes() != null &&
+ entry.getBackupNodes().length == 1 &&
+
entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) {
try {
if (log.isDebugEnabled()) {
log.debug(sm.getString("abstractReplicatedMap.newPrimary"));
@@ -944,8 +922,8 @@ public abstract class AbstractReplicatedMap<K,V>
entry.setCopy(false);
Member[] backup = publishEntryInfo(entry.getKey(),
entry.getValue());
entry.setBackupNodes(backup);
- if ( mapOwner!=null ) {
-
mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
+ if (mapOwner != null) {
+ mapOwner.objectMadePrimary(entry.getKey(),
entry.getValue());
}
} catch (ChannelException x) {
@@ -953,11 +931,10 @@ public abstract class AbstractReplicatedMap<K,V>
}
}
- } //while
+ } // while
long complete = System.currentTimeMillis() - start;
if (log.isInfoEnabled()) {
- log.info(sm.getString("abstractReplicatedMap.relocate.complete",
- Long.toString(complete)));
+ log.info(sm.getString("abstractReplicatedMap.relocate.complete",
Long.toString(complete)));
}
}
@@ -975,13 +952,14 @@ public abstract class AbstractReplicatedMap<K,V>
return node;
}
}
+
public Member getNextBackupNode() {
Member[] members = getMapMembers();
int node = getNextBackupIndex();
- if ( members.length == 0 || node==-1) {
+ if (members.length == 0 || node == -1) {
return null;
}
- if ( node >= members.length ) {
+ if (node >= members.length) {
node = 0;
}
return members[node];
@@ -989,9 +967,12 @@ public abstract class AbstractReplicatedMap<K,V>
/**
* Publish info about a map pair (key/value) to other nodes in the cluster.
- * @param key Object
+ *
+ * @param key Object
* @param value Object
+ *
* @return Member - the backup node
+ *
* @throws ChannelException Cluster error
*/
protected abstract Member[] publishEntryInfo(Object key, Object value)
throws ChannelException;
@@ -1002,31 +983,33 @@ public abstract class AbstractReplicatedMap<K,V>
if (this.state.isAvailable()) {
ping(accessTimeout);
}
- }catch ( Exception x ) {
-
log.error(sm.getString("abstractReplicatedMap.heartbeat.failed"),x);
+ } catch (Exception x) {
+ log.error(sm.getString("abstractReplicatedMap.heartbeat.failed"),
x);
}
}
-//------------------------------------------------------------------------------
-// METHODS TO OVERRIDE
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // METHODS TO OVERRIDE
+ //
------------------------------------------------------------------------------
@Override
public V remove(Object key) {
- return remove(key,true);
+ return remove(key, true);
}
+
public V remove(Object key, boolean notify) {
MapEntry<K,V> entry = innerMap.remove(key);
try {
if (getMapMembers().length > 0 && notify) {
- MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null);
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_REMOVE, false, (Serializable) key,
+ null, null, null, null);
getChannel().send(getMapMembers(), msg,
getChannelSendOptions());
}
- } catch ( ChannelException x ) {
- log.error(sm.getString("abstractReplicatedMap.unable.remove"),x);
+ } catch (ChannelException x) {
+ log.error(sm.getString("abstractReplicatedMap.unable.remove"), x);
}
- return entry!=null?entry.getValue():null;
+ return entry != null ? entry.getValue() : null;
}
public MapEntry<K,V> getInternal(Object key) {
@@ -1038,58 +1021,60 @@ public abstract class AbstractReplicatedMap<K,V>
public V get(Object key) {
MapEntry<K,V> entry = innerMap.get(key);
if (log.isTraceEnabled()) {
- log.trace("Requesting id:"+key+" entry:"+entry);
+ log.trace("Requesting id:" + key + " entry:" + entry);
}
- if ( entry == null ) {
+ if (entry == null) {
return null;
}
- if ( !entry.isPrimary() ) {
- //if the message is not primary, we need to retrieve the latest
value
+ if (!entry.isPrimary()) {
+ // if the message is not primary, we need to retrieve the latest
value
try {
Member[] backup = null;
MapMessage msg = null;
if (entry.isBackup()) {
- //select a new backup node
+ // select a new backup node
backup = publishEntryInfo(key, entry.getValue());
- } else if ( entry.isProxy() ) {
- //make sure we don't retrieve from ourselves
- msg = new MapMessage(getMapContextName(),
MapMessage.MSG_RETRIEVE_BACKUP, false,
- (Serializable) key, null, null,
null,null);
- Response[] resp =
getRpcChannel().send(entry.getBackupNodes(),msg, RpcChannel.FIRST_REPLY,
getChannelSendOptions(), getRpcTimeout());
+ } else if (entry.isProxy()) {
+ // make sure we don't retrieve from ourselves
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key,
+ null, null, null, null);
+ Response[] resp =
getRpcChannel().send(entry.getBackupNodes(), msg, RpcChannel.FIRST_REPLY,
+ getChannelSendOptions(), getRpcTimeout());
if (resp == null || resp.length == 0 ||
resp[0].getMessage() == null) {
- //no responses
+ // no responses
log.warn(sm.getString("abstractReplicatedMap.unable.retrieve", key));
return null;
}
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
backup = entry.getBackupNodes();
- if ( msg.getValue()!=null ) {
+ if (msg.getValue() != null) {
entry.setValue((V) msg.getValue());
}
// notify member
- msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
- (Serializable)entry.getKey(), null, null,
channel.getLocalMember(false), backup);
- if ( backup != null && backup.length > 0) {
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER, false,
+ (Serializable) entry.getKey(), null, null,
channel.getLocalMember(false), backup);
+ if (backup != null && backup.length > 0) {
getChannel().send(backup, msg,
getChannelSendOptions());
}
- //invalidate the previous primary
- msg = new
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
+ // invalidate the previous primary
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_PROXY, false, (Serializable) key, null,
+ null, channel.getLocalMember(false), backup);
Member[] dest = getMapMembersExcl(backup);
- if ( dest!=null && dest.length >0) {
+ if (dest != null && dest.length > 0) {
getChannel().send(dest, msg, getChannelSendOptions());
}
if (entry.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry val =
(ReplicatedMapEntry)entry.getValue();
+ ReplicatedMapEntry val = (ReplicatedMapEntry)
entry.getValue();
val.setOwner(getMapOwner());
}
- } else if ( entry.isCopy() ) {
+ } else if (entry.isCopy()) {
backup = getMapMembers();
if (backup.length > 0) {
- msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
-
(Serializable)key,null,null,channel.getLocalMember(false),backup);
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER, false,
+ (Serializable) key, null, null,
channel.getLocalMember(false), backup);
getChannel().send(backup, msg,
getChannelSendOptions());
}
}
@@ -1098,7 +1083,7 @@ public abstract class AbstractReplicatedMap<K,V>
entry.setBackup(false);
entry.setProxy(false);
entry.setCopy(false);
- if ( getMapOwner()!=null ) {
+ if (getMapOwner() != null) {
getMapOwner().objectMadePrimary(key, entry.getValue());
}
@@ -1108,7 +1093,7 @@ public abstract class AbstractReplicatedMap<K,V>
}
}
if (log.isTraceEnabled()) {
- log.trace("Requesting id:"+key+" result:"+entry.getValue());
+ log.trace("Requesting id:" + key + " result:" + entry.getValue());
}
return entry.getValue();
}
@@ -1116,23 +1101,22 @@ public abstract class AbstractReplicatedMap<K,V>
protected void printMap(String header) {
try {
- System.out.println("\nDEBUG MAP:"+header);
- System.out.println("Map[" +
- new String(mapContextName, StandardCharsets.ISO_8859_1) +
- ", Map Size:" + innerMap.size());
+ System.out.println("\nDEBUG MAP:" + header);
+ System.out.println(
+ "Map[" + new String(mapContextName,
StandardCharsets.ISO_8859_1) + ", Map Size:" + innerMap.size());
Member[] mbrs = getMapMembers();
- for ( int i=0; i<mbrs.length;i++ ) {
- System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
+ for (int i = 0; i < mbrs.length; i++) {
+ System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName());
}
Iterator<Map.Entry<K,MapEntry<K,V>>> i =
innerMap.entrySet().iterator();
int cnt = 0;
while (i.hasNext()) {
Map.Entry<?,?> e = i.next();
- System.out.println( (++cnt) + ". " + innerMap.get(e.getKey()));
+ System.out.println((++cnt) + ". " + innerMap.get(e.getKey()));
}
System.out.println("EndMap]\n\n");
- }catch ( Exception ignore) {
+ } catch (Exception ignore) {
if (log.isTraceEnabled()) {
log.trace("Error printing map", ignore);
}
@@ -1140,10 +1124,11 @@ public abstract class AbstractReplicatedMap<K,V>
}
/**
- * Returns true if the key has an entry in the map.
- * The entry can be a proxy or a backup entry, invoking
<code>get(key)</code>
- * will make this entry primary for the group
+ * Returns true if the key has an entry in the map. The entry can be a
proxy or a backup entry, invoking
+ * <code>get(key)</code> will make this entry primary for the group
+ *
* @param key Object
+ *
* @return boolean
*/
@Override
@@ -1165,28 +1150,28 @@ public abstract class AbstractReplicatedMap<K,V>
V old = null;
- //make sure that any old values get removed
- if ( containsKey(key) ) {
+ // make sure that any old values get removed
+ if (containsKey(key)) {
old = remove(key);
}
try {
- if ( notify ) {
+ if (notify) {
Member[] backup = publishEntryInfo(key, value);
entry.setBackupNodes(backup);
}
} catch (ChannelException x) {
log.error(sm.getString("abstractReplicatedMap.unable.put"), x);
}
- innerMap.put(key,entry);
+ innerMap.put(key, entry);
return old;
}
@Override
- public void putAll(Map<? extends K, ? extends V> m) {
- for (Entry<? extends K, ? extends V> value : m.entrySet()) {
+ public void putAll(Map<? extends K,? extends V> m) {
+ for (Entry<? extends K,? extends V> value : m.entrySet()) {
@SuppressWarnings("unchecked")
- Entry<K, V> entry = (Entry<K, V>) value;
+ Entry<K,V> entry = (Entry<K,V>) value;
put(entry.getKey(), entry.getValue());
}
}
@@ -1197,8 +1182,8 @@ public abstract class AbstractReplicatedMap<K,V>
}
public void clear(boolean notify) {
- if ( notify ) {
- //only delete active keys
+ if (notify) {
+ // only delete active keys
for (K k : keySet()) {
remove(k);
}
@@ -1210,9 +1195,9 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public boolean containsValue(Object value) {
Objects.requireNonNull(value);
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if (entry!=null && entry.isActive() &&
value.equals(entry.getValue())) {
+ if (entry != null && entry.isActive() &&
value.equals(entry.getValue())) {
return true;
}
}
@@ -1220,9 +1205,9 @@ public abstract class AbstractReplicatedMap<K,V>
}
/**
- * Returns the entire contents of the map
- * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object
containing all the information
- * about the object.
+ * Returns the entire contents of the map Map.Entry.getValue() will return
a LazyReplicatedMap.MapEntry object
+ * containing all the information about the object.
+ *
* @return Set
*/
public Set<Map.Entry<K,MapEntry<K,V>>> entrySetFull() {
@@ -1240,10 +1225,10 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public Set<Map.Entry<K,V>> entrySet() {
LinkedHashSet<Map.Entry<K,V>> set = new
LinkedHashSet<>(innerMap.size());
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
Object key = e.getKey();
MapEntry<K,V> entry = innerMap.get(key);
- if ( entry != null && entry.isActive() ) {
+ if (entry != null && entry.isActive()) {
set.add(entry);
}
}
@@ -1252,13 +1237,13 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public Set<K> keySet() {
- //todo implement
- //should only return keys where this is active.
+ // todo implement
+ // should only return keys where this is active.
LinkedHashSet<K> set = new LinkedHashSet<>(innerMap.size());
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
K key = e.getKey();
MapEntry<K,V> entry = innerMap.get(key);
- if ( entry!=null && entry.isActive() ) {
+ if (entry != null && entry.isActive()) {
set.add(key);
}
}
@@ -1269,15 +1254,15 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public int size() {
- //todo, implement a counter variable instead
- //only count active members in this node
+ // todo, implement a counter variable instead
+ // only count active members in this node
int counter = 0;
Iterator<Map.Entry<K,MapEntry<K,V>>> it =
innerMap.entrySet().iterator();
- while (it!=null && it.hasNext() ) {
+ while (it != null && it.hasNext()) {
Map.Entry<?,?> e = it.next();
- if ( e != null ) {
+ if (e != null) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if (entry!=null && entry.isActive() && entry.getValue() !=
null) {
+ if (entry != null && entry.isActive() && entry.getValue() !=
null) {
counter++;
}
}
@@ -1287,15 +1272,15 @@ public abstract class AbstractReplicatedMap<K,V>
@Override
public boolean isEmpty() {
- return size()==0;
+ return size() == 0;
}
@Override
public Collection<V> values() {
List<V> values = new ArrayList<>();
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if (entry!=null && entry.isActive() && entry.getValue()!=null) {
+ if (entry != null && entry.isActive() && entry.getValue() != null)
{
values.add(entry.getValue());
}
}
@@ -1303,10 +1288,10 @@ public abstract class AbstractReplicatedMap<K,V>
}
-//------------------------------------------------------------------------------
-// Map Entry class
-//------------------------------------------------------------------------------
- public static class MapEntry<K,V> implements Map.Entry<K,V> {
+ //
------------------------------------------------------------------------------
+ // Map Entry class
+ //
------------------------------------------------------------------------------
+ public static class MapEntry<K, V> implements Map.Entry<K,V> {
private boolean backup;
private boolean proxy;
private boolean copy;
@@ -1366,8 +1351,7 @@ public abstract class AbstractReplicatedMap<K,V>
}
public boolean isDiffable() {
- return (value instanceof ReplicatedMapEntry) &&
- ((ReplicatedMapEntry)value).isDiffable();
+ return (value instanceof ReplicatedMapEntry) &&
((ReplicatedMapEntry) value).isDiffable();
}
public void setBackupNodes(Member[] nodes) {
@@ -1421,15 +1405,18 @@ public abstract class AbstractReplicatedMap<K,V>
/**
* apply a diff, or an entire object
- * @param data byte[]
+ *
+ * @param data byte[]
* @param offset int
* @param length int
- * @param diff boolean
- * @throws IOException IO error
+ * @param diff boolean
+ *
+ * @throws IOException IO error
* @throws ClassNotFoundException Deserialization error
*/
@SuppressWarnings("unchecked")
- public void apply(byte[] data, int offset, int length, boolean diff)
throws IOException, ClassNotFoundException {
+ public void apply(byte[] data, int offset, int length, boolean diff)
+ throws IOException, ClassNotFoundException {
if (isDiffable() && diff) {
ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
rentry.lock();
@@ -1459,9 +1446,9 @@ public abstract class AbstractReplicatedMap<K,V>
}
-//------------------------------------------------------------------------------
-// map message to send to and from other maps
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // map message to send to and from other maps
+ //
------------------------------------------------------------------------------
public static class MapMessage implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@@ -1506,26 +1493,39 @@ public abstract class AbstractReplicatedMap<K,V>
public String getTypeDesc() {
switch (msgtype) {
- case MSG_BACKUP: return "MSG_BACKUP";
- case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP";
- case MSG_PROXY: return "MSG_PROXY";
- case MSG_REMOVE: return "MSG_REMOVE";
- case MSG_STATE: return "MSG_STATE";
- case MSG_START: return "MSG_START";
- case MSG_STOP: return "MSG_STOP";
- case MSG_INIT: return "MSG_INIT";
- case MSG_STATE_COPY: return "MSG_STATE_COPY";
- case MSG_COPY: return "MSG_COPY";
- case MSG_ACCESS: return "MSG_ACCESS";
- case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER";
- case MSG_PING: return "MSG_PING";
- default : return "UNKNOWN";
+ case MSG_BACKUP:
+ return "MSG_BACKUP";
+ case MSG_RETRIEVE_BACKUP:
+ return "MSG_RETRIEVE_BACKUP";
+ case MSG_PROXY:
+ return "MSG_PROXY";
+ case MSG_REMOVE:
+ return "MSG_REMOVE";
+ case MSG_STATE:
+ return "MSG_STATE";
+ case MSG_START:
+ return "MSG_START";
+ case MSG_STOP:
+ return "MSG_STOP";
+ case MSG_INIT:
+ return "MSG_INIT";
+ case MSG_STATE_COPY:
+ return "MSG_STATE_COPY";
+ case MSG_COPY:
+ return "MSG_COPY";
+ case MSG_ACCESS:
+ return "MSG_ACCESS";
+ case MSG_NOTIFY_MAPMEMBER:
+ return "MSG_NOTIFY_MAPMEMBER";
+ case MSG_PING:
+ return "MSG_PING";
+ default:
+ return "UNKNOWN";
}
}
- public MapMessage(byte[] mapId,int msgtype, boolean diff,
- Serializable key, Serializable value,
- byte[] diffvalue, Member primary, Member[] nodes) {
+ public MapMessage(byte[] mapId, int msgtype, boolean diff,
Serializable key, Serializable value,
+ byte[] diffvalue, Member primary, Member[] nodes) {
this.mapId = mapId;
this.msgtype = msgtype;
this.diff = diff;
@@ -1554,19 +1554,19 @@ public abstract class AbstractReplicatedMap<K,V>
public Serializable getKey() {
try {
return key(null);
- } catch ( Exception x ) {
+ } catch (Exception x) {
throw new
RuntimeException(sm.getString("mapMessage.deserialize.error.key"), x);
}
}
public Serializable key(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
- if ( key!=null ) {
+ if (key != null) {
return key;
}
- if ( keydata == null || keydata.length == 0 ) {
+ if (keydata == null || keydata.length == 0) {
return null;
}
- key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
+ key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls);
keydata = null;
return key;
}
@@ -1578,19 +1578,19 @@ public abstract class AbstractReplicatedMap<K,V>
public Serializable getValue() {
try {
return value(null);
- } catch ( Exception x ) {
+ } catch (Exception x) {
throw new
RuntimeException(sm.getString("mapMessage.deserialize.error.value"), x);
}
}
- public Serializable value(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
- if ( value!=null ) {
+ public Serializable value(ClassLoader[] cls) throws IOException,
ClassNotFoundException {
+ if (value != null) {
return value;
}
- if ( valuedata == null || valuedata.length == 0 ) {
+ if (valuedata == null || valuedata.length == 0) {
return null;
}
- value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
+ value = XByteBuffer.deserialize(valuedata, 0, valuedata.length,
cls);
valuedata = null;
return value;
}
@@ -1621,11 +1621,11 @@ public abstract class AbstractReplicatedMap<K,V>
public void setValue(Serializable value) {
try {
- if ( value != null ) {
+ if (value != null) {
valuedata = XByteBuffer.serialize(value);
}
this.value = value;
- }catch ( IOException x ) {
+ } catch (IOException x) {
throw new RuntimeException(x);
}
}
@@ -1650,7 +1650,7 @@ public abstract class AbstractReplicatedMap<K,V>
throw new AssertionError();
}
}
- } //MapMessage
+ } // MapMessage
public Channel getChannel() {
diff --git a/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
b/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
index 712ffa0a4c..751b2d15de 100644
--- a/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
+++ b/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
@@ -27,87 +27,93 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
- * A smart implementation of a stateful replicated map. uses primary/secondary
backup strategy.
- * One node is always the primary and one node is always the backup.
- * This map is synchronized across a cluster, and only has one backup
member.<br>
+ * A smart implementation of a stateful replicated map. uses primary/secondary
backup strategy. One node is always the
+ * primary and one node is always the backup. This map is synchronized across
a cluster, and only has one backup
+ * member.<br>
* A perfect usage for this map would be a session map for a session manager
in a clustered environment.<br>
- * The only way to modify this list is to use the <code>put, putAll,
remove</code> methods.
- * entrySet, entrySetFull, keySet, keySetFull, returns all non modifiable
sets.<br><br>
- * If objects (values) in the map change without invoking <code>put()</code>
or <code>remove()</code>
- * the data can be distributed using two different methods:<br>
+ * The only way to modify this list is to use the <code>put, putAll,
remove</code> methods. entrySet, entrySetFull,
+ * keySet, keySetFull, returns all non modifiable sets.<br>
+ * <br>
+ * If objects (values) in the map change without invoking <code>put()</code>
or <code>remove()</code> the data can be
+ * distributed using two different methods:<br>
* <code>replicate(boolean)</code> and <code>replicate(Object,
boolean)</code><br>
* These two methods are very important two understand. The map can work with
two set of value objects:<br>
* 1. Serializable - the entire object gets serialized each time it is
replicated<br>
* 2. ReplicatedMapEntry - this interface allows for a isDirty() flag and to
replicate diffs if desired.<br>
- * Implementing the <code>ReplicatedMapEntry</code> interface allows you to
decide what objects
- * get replicated and how much data gets replicated each time.<br>
- * If you implement a smart AOP mechanism to detect changes in underlying
objects, you can replicate
- * only those changes by implementing the ReplicatedMapEntry interface, and
return true when isDiffable()
- * is invoked.<br><br>
- *
- * This map implementation doesn't have a background thread running to
replicate changes.
- * If you do have changes without invoking put/remove then you need to invoke
one of the following methods:
+ * Implementing the <code>ReplicatedMapEntry</code> interface allows you to
decide what objects get replicated and how
+ * much data gets replicated each time.<br>
+ * If you implement a smart AOP mechanism to detect changes in underlying
objects, you can replicate only those changes
+ * by implementing the ReplicatedMapEntry interface, and return true when
isDiffable() is invoked.<br>
+ * <br>
+ * This map implementation doesn't have a background thread running to
replicate changes. If you do have changes without
+ * invoking put/remove then you need to invoke one of the following methods:
* <ul>
* <li><code>replicate(Object,boolean)</code> - replicates only the object
that belongs to the key</li>
* <li><code>replicate(boolean)</code> - Scans the entire map for changes and
replicates data</li>
- * </ul>
- * the <code>boolean</code> value in the <code>replicate</code> method used to
decide
- * whether to only replicate objects that implement the
<code>ReplicatedMapEntry</code> interface
- * or to replicate all objects. If an object doesn't implement the
<code>ReplicatedMapEntry</code> interface
- * each time the object gets replicated the entire object gets serialized,
hence a call to <code>replicate(true)</code>
- * will replicate all objects in this map that are using this node as primary.
- *
- * <br><br><b>REMEMBER TO CALL</b> <code>breakdown()</code> when you are done
with the map to
- * avoid memory leaks.<br><br>
+ * </ul>
+ * the <code>boolean</code> value in the <code>replicate</code> method used to
decide whether to only replicate objects
+ * that implement the <code>ReplicatedMapEntry</code> interface or to
replicate all objects. If an object doesn't
+ * implement the <code>ReplicatedMapEntry</code> interface each time the
object gets replicated the entire object gets
+ * serialized, hence a call to <code>replicate(true)</code> will replicate all
objects in this map that are using this
+ * node as primary. <br>
+ * <br>
+ * <b>REMEMBER TO CALL</b> <code>breakdown()</code> when you are done with the
map to avoid memory leaks.<br>
+ * <br>
* TODO implement periodic sync/transfer thread
*
* @param <K> The type of Key
* @param <V> The type of Value
*/
-public class LazyReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> {
+public class LazyReplicatedMap<K, V> extends AbstractReplicatedMap<K,V> {
private static final long serialVersionUID = 1L;
// Lazy init to support serialization
private transient volatile Log log;
-//------------------------------------------------------------------------------
-// CONSTRUCTORS / DESTRUCTORS
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // CONSTRUCTORS / DESTRUCTORS
+ //
------------------------------------------------------------------------------
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
- * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
+ * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
- * @param cls Class loaders
+ * @param loadFactor float - load factor, see HashMap
+ * @param cls Class loaders
*/
- public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, int initialCapacity, float loadFactor, ClassLoader[]
cls) {
- super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor,
Channel.SEND_OPTIONS_DEFAULT,cls, true);
+ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, int initialCapacity,
+ float loadFactor, ClassLoader[] cls) {
+ super(owner, channel, timeout, mapContextName, initialCapacity,
loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls,
+ true);
}
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
- * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
+ * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
- * @param cls Class loaders
+ * @param cls Class loaders
*/
- public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, int initialCapacity, ClassLoader[] cls) {
+ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, int initialCapacity,
+ ClassLoader[] cls) {
super(owner, channel, timeout, mapContextName, initialCapacity,
DEFAULT_LOAD_FACTOR,
Channel.SEND_OPTIONS_DEFAULT, cls, true);
}
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
- * @param cls Class loaders
+ * @param cls Class loaders
*/
public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, ClassLoader[] cls) {
super(owner, channel, timeout, mapContextName,
DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
@@ -116,22 +122,24 @@ public class LazyReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
- * @param cls Class loaders
- * @param terminate boolean - Flag for whether to terminate this map that
failed to start.
+ * @param cls Class loaders
+ * @param terminate boolean - Flag for whether to terminate this map
that failed to start.
*/
- public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, ClassLoader[] cls, boolean terminate) {
+ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout,
String mapContextName, ClassLoader[] cls,
+ boolean terminate) {
super(owner, channel, timeout, mapContextName,
DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
Channel.SEND_OPTIONS_DEFAULT, cls, terminate);
}
-//------------------------------------------------------------------------------
-// METHODS TO OVERRIDE
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // METHODS TO OVERRIDE
+ //
------------------------------------------------------------------------------
@Override
protected int getStateMessageType() {
return AbstractReplicatedMap.MapMessage.MSG_STATE;
@@ -145,7 +153,7 @@ public class LazyReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
@Override
protected Member[] publishEntryInfo(Object key, Object value) throws
ChannelException {
Log log = getLog();
- if (! (key instanceof Serializable && value instanceof Serializable)
) {
+ if (!(key instanceof Serializable && value instanceof Serializable)) {
return new Member[0];
}
Member[] members = getMapMembers();
@@ -153,19 +161,19 @@ public class LazyReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
int nextIdx = firstIdx;
Member[] backup = new Member[0];
- //there are no backups
- if ( members.length == 0 || firstIdx == -1 ) {
+ // there are no backups
+ if (members.length == 0 || firstIdx == -1) {
return backup;
}
boolean success = false;
do {
- //select a backup node
+ // select a backup node
Member next = members[nextIdx];
- //increment for the next round of back up selection
+ // increment for the next round of back up selection
nextIdx = nextIdx + 1;
- if ( nextIdx >= members.length ) {
+ if (nextIdx >= members.length) {
nextIdx = 0;
}
@@ -175,41 +183,41 @@ public class LazyReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
MapMessage msg = null;
try {
Member[] tmpBackup = wrap(next);
- //publish the backup data to one node
- msg = new MapMessage(getMapContextName(),
MapMessage.MSG_BACKUP, false,
- (Serializable) key, (Serializable) value,
null, channel.getLocalMember(false), tmpBackup);
- if ( log.isTraceEnabled() ) {
- log.trace("Publishing backup data:"+msg+" to:
"+next.getName());
+ // publish the backup data to one node
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_BACKUP, false, (Serializable) key,
+ (Serializable) value, null,
channel.getLocalMember(false), tmpBackup);
+ if (log.isTraceEnabled()) {
+ log.trace("Publishing backup data:" + msg + " to: " +
next.getName());
}
UniqueId id = getChannel().send(tmpBackup, msg,
getChannelSendOptions());
- if ( log.isTraceEnabled() ) {
- log.trace("Data published:"+msg+" msg Id:"+id);
+ if (log.isTraceEnabled()) {
+ log.trace("Data published:" + msg + " msg Id:" + id);
}
- //we published out to a backup, mark the test success
+ // we published out to a backup, mark the test success
success = true;
backup = tmpBackup;
- }catch ( ChannelException x ) {
+ } catch (ChannelException x) {
log.error(sm.getString("lazyReplicatedMap.unableReplicate.backup", key, next,
x.getMessage()), x);
continue;
}
try {
- //publish the data out to all nodes
+ // publish the data out to all nodes
Member[] proxies = excludeFromSet(backup, getMapMembers());
- if (success && proxies.length > 0 ) {
- msg = new MapMessage(getMapContextName(),
MapMessage.MSG_PROXY, false,
- (Serializable) key, null, null,
channel.getLocalMember(false),backup);
- if ( log.isTraceEnabled() ) {
- log.trace("Publishing proxy data:"+msg+" to:
"+Arrays.toNameString(proxies));
+ if (success && proxies.length > 0) {
+ msg = new MapMessage(getMapContextName(),
MapMessage.MSG_PROXY, false, (Serializable) key, null,
+ null, channel.getLocalMember(false), backup);
+ if (log.isTraceEnabled()) {
+ log.trace("Publishing proxy data:" + msg + " to: " +
Arrays.toNameString(proxies));
}
getChannel().send(proxies, msg, getChannelSendOptions());
}
- }catch ( ChannelException x ) {
- //log the error, but proceed, this should only happen if a
node went down,
- //and if the node went down, then it can't receive the
message, the others
- //should still get it.
+ } catch (ChannelException x) {
+ // log the error, but proceed, this should only happen if a
node went down,
+ // and if the node went down, then it can't receive the
message, the others
+ // should still get it.
log.error(sm.getString("lazyReplicatedMap.unableReplicate.proxy", key, next,
x.getMessage()), x);
}
- } while ( !success && (firstIdx!=nextIdx));
+ } while (!success && (firstIdx != nextIdx));
return backup;
}
diff --git a/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
b/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
index 92c9d6f2e2..a935098579 100644
--- a/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
+++ b/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
@@ -30,74 +30,80 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
- * All-to-all replication for a hash map implementation. Each node in the
cluster will carry an identical
- * copy of the map.<br><br>
- * This map implementation doesn't have a background thread running to
replicate changes.
- * If you do have changes without invoking put/remove then you need to invoke
one of the following methods:
+ * All-to-all replication for a hash map implementation. Each node in the
cluster will carry an identical copy of the
+ * map.<br>
+ * <br>
+ * This map implementation doesn't have a background thread running to
replicate changes. If you do have changes without
+ * invoking put/remove then you need to invoke one of the following methods:
* <ul>
* <li><code>replicate(Object,boolean)</code> - replicates only the object
that belongs to the key</li>
* <li><code>replicate(boolean)</code> - Scans the entire map for changes and
replicates data</li>
- * </ul>
- * the <code>boolean</code> value in the <code>replicate</code> method used to
decide
- * whether to only replicate objects that implement the
<code>ReplicatedMapEntry</code> interface
- * or to replicate all objects. If an object doesn't implement the
<code>ReplicatedMapEntry</code> interface
- * each time the object gets replicated the entire object gets serialized,
hence a call to <code>replicate(true)</code>
- * will replicate all objects in this map that are using this node as primary.
- *
- * <br><br><b>REMEMBER TO CALL <code>breakdown()</code>
- * when you are done with the map to avoid memory leaks.</b><br><br>
+ * </ul>
+ * the <code>boolean</code> value in the <code>replicate</code> method used to
decide whether to only replicate objects
+ * that implement the <code>ReplicatedMapEntry</code> interface or to
replicate all objects. If an object doesn't
+ * implement the <code>ReplicatedMapEntry</code> interface each time the
object gets replicated the entire object gets
+ * serialized, hence a call to <code>replicate(true)</code> will replicate all
objects in this map that are using this
+ * node as primary. <br>
+ * <br>
+ * <b>REMEMBER TO CALL <code>breakdown()</code> when you are done with the map
to avoid memory leaks.</b><br>
+ * <br>
* TODO implement periodic sync/transfer thread<br>
- * TODO memberDisappeared, should do nothing except change map membership
- * by default it relocates the primary objects
+ * TODO memberDisappeared, should do nothing except change map membership by
default it relocates the primary objects
*
* @param <K> The type of Key
* @param <V> The type of Value
*/
-public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> {
+public class ReplicatedMap<K, V> extends AbstractReplicatedMap<K,V> {
private static final long serialVersionUID = 1L;
// Lazy init to support serialization
private transient volatile Log log;
-
//--------------------------------------------------------------------------
- // CONSTRUCTORS / DESTRUCTORS
-
//--------------------------------------------------------------------------
+ //
--------------------------------------------------------------------------
+ // CONSTRUCTORS / DESTRUCTORS
+ //
--------------------------------------------------------------------------
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
- * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
+ * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
- * @param cls Class loaders
+ * @param loadFactor float - load factor, see HashMap
+ * @param cls Class loaders
*/
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,float loadFactor, ClassLoader[] cls) {
- super(owner,channel, timeout, mapContextName, initialCapacity,
loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls, true);
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,
+ float loadFactor, ClassLoader[] cls) {
+ super(owner, channel, timeout, mapContextName, initialCapacity,
loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls,
+ true);
}
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
- * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
+ * @param mapContextName String - unique name for this map, to allow
multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
- * @param cls Class loaders
+ * @param cls Class loaders
*/
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, int initialCapacity, ClassLoader[] cls) {
- super(owner,channel, timeout, mapContextName, initialCapacity,
DEFAULT_LOAD_FACTOR,
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, int initialCapacity,
+ ClassLoader[] cls) {
+ super(owner, channel, timeout, mapContextName, initialCapacity,
DEFAULT_LOAD_FACTOR,
Channel.SEND_OPTIONS_DEFAULT, cls, true);
}
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
- * @param cls Class loaders
+ * @param cls Class loaders
*/
public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, ClassLoader[] cls) {
super(owner, channel, timeout, mapContextName,
DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
@@ -106,21 +112,23 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
/**
* Creates a new map
- * @param owner The map owner
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messages
+ *
+ * @param owner The map owner
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messages
* @param mapContextName String - unique name for this map, to allow
multiple maps per channel
- * @param cls Class loaders
- * @param terminate boolean - Flag for whether to terminate this map that
failed to start.
+ * @param cls Class loaders
+ * @param terminate boolean - Flag for whether to terminate this map
that failed to start.
*/
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, ClassLoader[] cls, boolean terminate) {
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String
mapContextName, ClassLoader[] cls,
+ boolean terminate) {
super(owner, channel, timeout, mapContextName,
DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
Channel.SEND_OPTIONS_DEFAULT, cls, terminate);
}
-//------------------------------------------------------------------------------
-// METHODS TO OVERRIDE
-//------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------
+ // METHODS TO OVERRIDE
+ //
------------------------------------------------------------------------------
@Override
protected int getStateMessageType() {
return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
@@ -133,10 +141,10 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
@Override
protected Member[] publishEntryInfo(Object key, Object value) throws
ChannelException {
- if (! (key instanceof Serializable && value instanceof Serializable)
) {
+ if (!(key instanceof Serializable && value instanceof Serializable)) {
return new Member[0];
}
- //select a backup node
+ // select a backup node
Member[] backup = getMapMembers();
if (backup == null || backup.length == 0) {
@@ -144,9 +152,9 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
}
try {
- //publish the data out to all nodes
- MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_COPY, false,
- (Serializable) key, (Serializable) value,
null,channel.getLocalMember(false), backup);
+ // publish the data out to all nodes
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_COPY, false, (Serializable) key,
+ (Serializable) value, null, channel.getLocalMember(false),
backup);
getChannel().send(backup, msg, getChannelSendOptions());
} catch (ChannelException e) {
@@ -181,29 +189,29 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
boolean removed = false;
Log log = getLog();
synchronized (mapMembers) {
- removed = (mapMembers.remove(member) != null );
+ removed = (mapMembers.remove(member) != null);
if (!removed) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("replicatedMap.member.disappeared.unknown", member));
}
- return; //the member was not part of our map.
+ return; // the member was not part of our map.
}
}
if (log.isInfoEnabled()) {
log.info(sm.getString("replicatedMap.member.disappeared", member));
}
long start = System.currentTimeMillis();
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if (entry==null) {
+ if (entry == null) {
continue;
}
if (entry.isPrimary()) {
try {
Member[] backup = getMapMembers();
if (backup.length > 0) {
- MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
-
(Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER, false,
+ (Serializable) entry.getKey(), null, null,
channel.getLocalMember(false), backup);
getChannel().send(backup, msg,
getChannelSendOptions());
}
entry.setBackupNodes(backup);
@@ -215,11 +223,9 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
entry.setPrimary(null);
}
- if ( entry.getPrimary() == null &&
- entry.isCopy() &&
- entry.getBackupNodes()!=null &&
- entry.getBackupNodes().length > 0 &&
-
entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+ if (entry.getPrimary() == null && entry.isCopy() &&
entry.getBackupNodes() != null &&
+ entry.getBackupNodes().length > 0 &&
+
entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) {
try {
entry.setPrimary(channel.getLocalMember(false));
entry.setBackup(false);
@@ -227,13 +233,13 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
entry.setCopy(false);
Member[] backup = getMapMembers();
if (backup.length > 0) {
- MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
-
(Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+ MapMessage msg = new MapMessage(getMapContextName(),
MapMessage.MSG_NOTIFY_MAPMEMBER, false,
+ (Serializable) entry.getKey(), null, null,
channel.getLocalMember(false), backup);
getChannel().send(backup, msg,
getChannelSendOptions());
}
entry.setBackupNodes(backup);
- if ( mapOwner!=null ) {
-
mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
+ if (mapOwner != null) {
+ mapOwner.objectMadePrimary(entry.getKey(),
entry.getValue());
}
} catch (ChannelException x) {
@@ -241,35 +247,34 @@ public class ReplicatedMap<K,V> extends
AbstractReplicatedMap<K,V> {
}
}
- } //while
+ } // while
long complete = System.currentTimeMillis() - start;
if (log.isInfoEnabled()) {
- log.info(sm.getString("replicatedMap.relocate.complete",
- Long.toString(complete)));
+ log.info(sm.getString("replicatedMap.relocate.complete",
Long.toString(complete)));
}
}
@Override
public void mapMemberAdded(Member member) {
- if ( member.equals(getChannel().getLocalMember(false)) ) {
+ if (member.equals(getChannel().getLocalMember(false))) {
return;
}
boolean memberAdded = false;
synchronized (mapMembers) {
- if (!mapMembers.containsKey(member) ) {
+ if (!mapMembers.containsKey(member)) {
mapMembers.put(member,
Long.valueOf(System.currentTimeMillis()));
memberAdded = true;
}
}
- if ( memberAdded ) {
+ if (memberAdded) {
synchronized (stateMutex) {
Member[] backup = getMapMembers();
- for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) {
+ for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
MapEntry<K,V> entry = innerMap.get(e.getKey());
- if ( entry == null ) {
+ if (entry == null) {
continue;
}
- if (entry.isPrimary() &&
!inSet(member,entry.getBackupNodes())) {
+ if (entry.isPrimary() && !inSet(member,
entry.getBackupNodes())) {
entry.setBackupNodes(backup);
}
}
diff --git a/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
b/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
index 7b97aef157..9cf98380e8 100644
--- a/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
+++ b/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
@@ -32,8 +32,7 @@ import java.io.Serializable;
* 5. entry.unlock();<br>
* }<br>
* }<br>
- * </code>
- * <br>
+ * </code> <br>
* <br>
* When the data is deserialized the logic is called in the following order<br>
* <code>
@@ -44,22 +43,25 @@ import java.io.Serializable;
public interface ReplicatedMapEntry extends Serializable {
/**
- * Has the object changed since last replication
- * and is not in a locked state
+ * Has the object changed since last replication and is not in a locked
state
+ *
* @return boolean
*/
boolean isDirty();
/**
- * If this returns true, the map will extract the diff using getDiff()
- * Otherwise it will serialize the entire object.
+ * If this returns true, the map will extract the diff using getDiff()
Otherwise it will serialize the entire
+ * object.
+ *
* @return boolean
*/
boolean isDiffable();
/**
* Returns a diff and sets the dirty map to false
+ *
* @return Serialized diff data
+ *
* @throws IOException IO error serializing
*/
byte[] getDiff() throws IOException;
@@ -67,10 +69,12 @@ public interface ReplicatedMapEntry extends Serializable {
/**
* Applies a diff to an existing object.
- * @param diff Serialized diff data
+ *
+ * @param diff Serialized diff data
* @param offset Array offset
* @param length Array length
- * @throws IOException IO error deserializing
+ *
+ * @throws IOException IO error deserializing
* @throws ClassNotFoundException Serialization error
*/
void applyDiff(byte[] diff, int offset, int length) throws IOException,
ClassNotFoundException;
@@ -91,24 +95,24 @@ public interface ReplicatedMapEntry extends Serializable {
void unlock();
/**
- * This method is called after the object has been
- * created on a remote map. On this method,
- * the object can initialize itself for any data that wasn't
+ * This method is called after the object has been created on a remote
map. On this method, the object can
+ * initialize itself for any data that wasn't
*
* @param owner Object
*/
void setOwner(Object owner);
/**
- * For accuracy checking, a serialized attribute can contain a version
number
- * This number increases as modifications are made to the data.
- * The replicated map can use this to ensure accuracy on a periodic basis
+ * For accuracy checking, a serialized attribute can contain a version
number This number increases as modifications
+ * are made to the data. The replicated map can use this to ensure
accuracy on a periodic basis
+ *
* @return long - the version number or -1 if the data is not versioned
*/
long getVersion();
/**
* Forces a certain version to a replicated map entry<br>
+ *
* @param version long
*/
void setVersion(long version);
@@ -120,12 +124,14 @@ public interface ReplicatedMapEntry extends Serializable {
/**
* Set the last replicate time.
+ *
* @param lastTimeReplicated New timestamp
*/
void setLastTimeReplicated(long lastTimeReplicated);
/**
* If this returns true, to replicate that an object has been accessed
+ *
* @return boolean
*/
boolean isAccessReplicate();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]