Author: fhanik
Date: Thu Mar 9 18:04:49 2006
New Revision: 384676
URL: http://svn.apache.org/viewcvs?rev=384676&view=rev
Log:
Working on the replicated map
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384676&r1=384675&r2=384676&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Thu Mar 9 18:04:49 2006
@@ -30,14 +30,26 @@
import org.apache.catalina.tribes.ChannelListener;
import java.util.Collection;
import org.apache.catalina.tribes.MembershipListener;
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.catalina.tribes.mcast.McastMember;
+import java.util.Iterator;
+import org.apache.catalina.tribes.ChannelException;
+import java.util.LinkedList;
+import java.util.LinkedHashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
/**
+ * @todo implement periodic sync/transfer
* @author Filip Hanik
* @version 1.0
*/
public class LazyReplicatedMap extends LinkedHashMap
implements RpcCallback, ChannelListener, MembershipListener {
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class);
+ protected static long TIME_OUT = 15000;//hard coded timeout
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
@@ -45,6 +57,7 @@
private Channel channel;
private RpcChannel rpcChannel;
+ private byte[] mapContextName;
//------------------------------------------------------------------------------
@@ -69,13 +82,15 @@
final String chset = "ISO-8859-1";
this.channel = channel;
try {
- this.rpcChannel = new RpcChannel(mapContextName.getBytes(chset),
channel, this);
+ this.mapContextName = mapContextName.getBytes(chset);
}catch (UnsupportedEncodingException x) {
log.warn("Unable to encode mapContextName["+mapContextName+"]
using getBytes("+chset+") using default getBytes()",x);
- this.rpcChannel = new RpcChannel(mapContextName.getBytes(),
channel, this);
+ this.mapContextName = mapContextName.getBytes();
}
+ this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
this.channel.addChannelListener(this);
this.channel.addMembershipListener(this);
+ transferState();
}
@@ -93,18 +108,30 @@
}
this.rpcChannel = null;
this.channel = null;
+ super.clear();
}
//------------------------------------------------------------------------------
// GROUP COM INTERFACES
//------------------------------------------------------------------------------
+ public void transferState() {
+ throw new UnsupportedOperationException();
+ }
+
/**
- *
+ * @todo implement state transfer
* @param msg Serializable
* @return Serializable - null if no reply should be sent
*/
public Serializable replyRequest(Serializable msg, Member sender) {
- throw new UnsupportedOperationException();
+ if ( !(msg instanceof MapMessage) ) return null;
+ MapMessage mapmsg = (MapMessage)msg;
+ if ( mapmsg.getMsgType() != mapmsg.MSG_RETRIEVE_BACKUP ) return null;
+
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if ( entry == null ) return null;
+ mapmsg.setValue((Serializable)entry.getValue());
+ return mapmsg;
}
/**
@@ -114,30 +141,112 @@
* @param sender Member
*/
public void leftOver(Serializable msg, Member sender) {
- throw new UnsupportedOperationException();
+ //ignore left over responses
}
public void messageReceived(Serializable msg, Member sender) {
throw new UnsupportedOperationException();
+ //todo implement all the messages that we can receive
}
public boolean accept(Serializable msg, Member sender) {
- throw new UnsupportedOperationException();
+ if ( msg instanceof MapMessage ) {
+ return Arrays.equals(mapContextName,((MapMessage)msg).getMapId());
+ }
+ return false;
}
public void memberAdded(Member member) {
-
+ //do nothing, we don't care
}
public void memberDisappeared(Member member) {
-
+ //todo move all sessions that are primary here to and have this member
as
+ //a backup
+ Iterator i = super.entrySet().iterator();
+ while ( i.hasNext() ) {
+ Map.Entry e = (Map.Entry)i.next();
+ MapEntry entry = (MapEntry)e.getValue();
+ if ( entry.isPrimary() && member.equals(entry.getBackupNode())) {
+ try {
+ Member backup = publishEntryInfo(entry.getKey(),
entry.getValue());
+ entry.setBackupNode(backup);
+ }catch ( ChannelException x ) {
+ log.error("Unable to relocate["+entry.getKey()+"] to a new
backup node",x);
+ }
+ }//end if
+ }//while
+ }
+
+ int currentNode = 0;
+ public Member getNextBackupNode() {
+ Member[] members = channel.getMembers();
+ if ( members.length == 0 ) return null;
+ int node = currentNode++;
+ if ( node >= members.length ) {
+ node = 0;
+ currentNode = 0;
+ }
+ return members[node];
}
+
+
//------------------------------------------------------------------------------
// METHODS TO OVERRIDE
//------------------------------------------------------------------------------
-
+ /**
+ * publish info about a map pair (key/value) to other nodes in the cluster
+ * @param key Object
+ * @param value Object
+ * @return Member
+ * @throws ChannelException
+ */
+ protected Member publishEntryInfo(Object key, Object value) throws
ChannelException {
+ //select a backup node
+ Member backup = getNextBackupNode();
+ //publish the data out to all nodes
+ MapMessage msg = new MapMessage(this.mapContextName,
MapMessage.MSG_PROXY, false,
+ (Serializable) key, null, null,
backup);
+ channel.send(channel.getMembers(), msg);
+
+ //publish the backup data to one node
+ msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false,
+ (Serializable) key, (Serializable) value, null,
backup);
+ channel.send(new Member[] {backup}, msg);
+ return backup;
+ }
+
public Object get(Object key) {
- return super.get(key);
+ MapEntry entry = (MapEntry)super.get(key);
+ if ( entry == null ) return null;
+ if ( !entry.isPrimary() ) {
+ try {
+ MapMessage msg = new MapMessage(mapContextName,
MapMessage.MSG_RETRIEVE_BACKUP, false,
+ (Serializable) key, null,
null, null);
+ Response[] resp = rpcChannel.send(new Member[]
{entry.getBackupNode()},
+ msg,
this.rpcChannel.FIRST_REPLY, TIME_OUT);
+ if (resp == null || resp.length == 0) {
+ //no responses
+ log.warn("Unable to retrieve object for key:" + key);
+ return null;
+ }
+ msg = (MapMessage) resp[0].getMessage();
+
+ Member backup = entry.getBackupNode();
+ if (entry.isBackup()) {
+ //select a new backup node
+ backup = publishEntryInfo(key, msg.getValue());
+ }
+ entry.setBackupNode(backup);
+ entry.setBackup(false);
+ entry.setProxy(false);
+ entry.setValue(msg.getValue());
+ } catch (ChannelException x) {
+ log.error("Unable to replicate out data for a
LazyReplicatedMap.get operation", x);
+ return null;
+ }
+ }
+ return entry.getValue();
}
public boolean containsKey(Object key) {
@@ -145,19 +254,53 @@
}
public Object put(Object key, Object value) {
- return super.put(key,value);
+ if ( !(key instanceof Serializable) ) throw new
IllegalArgumentException("Key is not serializable:"+key.getClass().getName());
+ if ( value == null ) return remove(key);
+ if ( !(value instanceof Serializable) ) throw new
IllegalArgumentException("Value is not
serializable:"+value.getClass().getName());
+
+ MapEntry entry = new MapEntry((Serializable)key,(Serializable)value);
+ entry.setBackup(false);
+ entry.setProxy(false);
+
+ Object old = null;
+
+ //make sure that any old values get removed
+ if ( containsKey(key) ) old = (MapEntry)remove(key);
+ try {
+ Member backup = publishEntryInfo(key, value);
+ entry.setBackupNode(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to replicate out data for a
LazyReplicatedMap.put operation", x);
+ }
+ super.put(key,entry);
+ return old;
}
+
+
public void putAll(Map m) {
- super.putAll(m);
+ Iterator i = m.entrySet().iterator();
+ while ( i.hasNext() ) {
+ Map.Entry entry = (Map.Entry)i.next();
+ put(entry.getKey(),entry.getValue());
+ }
}
public Object remove(Object key) {
- return super.remove(key);
+ MapEntry entry = (MapEntry)super.remove(key);
+ MapMessage msg = new
MapMessage(mapContextName,MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
+ try {
+ channel.send(channel.getMembers(), msg);
+ } catch ( ChannelException x ) {
+ log.error("Unable to replicate out data for a
LazyReplicatedMap.remove operation",x);
+ }
+ return entry!=null?entry.getValue():null;
}
public void clear() {
- super.clear();
+ //only delete active keys
+ Iterator keys = keySet().iterator();
+ while ( keys.hasNext() ) remove(keys.next());
}
public boolean containsValue(Object value) {
@@ -165,19 +308,44 @@
}
public Object clone() {
- return super.clone();
+ throw new UnsupportedOperationException("This operation is not valid
on a replicated map");
}
public Set entrySet() {
- return super.entrySet();
+ LinkedHashSet set = new LinkedHashSet(super.size());
+ Iterator i = super.entrySet().iterator();
+ while ( i.hasNext() ) {
+ Map.Entry e = (Map.Entry)i.next();
+ MapEntry entry = (MapEntry)e.getValue();
+ if ( entry.isPrimary() ) set.add(entry.getValue());
+ }
+ return set;
}
public Set keySet() {
- return super.keySet();
+ //todo implement
+ //should only return keys where this is active.
+ LinkedHashSet set = new LinkedHashSet(super.size());
+ Iterator i = super.entrySet().iterator();
+ while ( i.hasNext() ) {
+ Map.Entry e = (Map.Entry)i.next();
+ MapEntry entry = (MapEntry)e.getValue();
+ if ( entry.isPrimary() ) set.add(entry.getKey());
+ }
+ return set;
}
public int size() {
- return super.size();
+ //todo, implement a counter variable instead
+ //only count active members in this node
+ int counter = 0;
+ Iterator i = super.entrySet().iterator();
+ while ( i.hasNext() ) {
+ Map.Entry e = (Map.Entry)i.next();
+ MapEntry entry = (MapEntry)e.getValue();
+ if ( entry.isPrimary() ) counter++;
+ }
+ return counter;
}
protected boolean removeEldestEntry(Map.Entry eldest) {
@@ -185,11 +353,18 @@
}
public boolean isEmpty() {
- return super.isEmpty();
+ return size()==0;
}
public Collection values() {
- return super.values();
+ ArrayList values = new ArrayList(super.size());
+ Iterator i = super.entrySet().iterator();
+ while ( i.hasNext() ) {
+ Map.Entry e = (Map.Entry)i.next();
+ MapEntry entry = (MapEntry)e.getValue();
+ if ( entry.isPrimary() ) values.add(entry.getValue());
+ }
+ return values;
}
@@ -220,6 +395,10 @@
public boolean isProxy() {
return proxy;
}
+
+ public boolean isPrimary() {
+ return ((!proxy) && (!backup));
+ }
public void setProxy(boolean proxy) {
this.proxy = proxy;
@@ -237,8 +416,6 @@
return backupNode;
}
-
-
public Object getValue() {
return value;
}
@@ -300,6 +477,131 @@
}
}
+//------------------------------------------------------------------------------
+// map message to send to and from other maps
+//------------------------------------------------------------------------------
+
+ public static class MapMessage implements Externalizable {
+ public static final int MSG_BACKUP = 1;
+ public static final int MSG_RETRIEVE_BACKUP = 2;
+ public static final int MSG_PROXY = 3;
+ public static final int MSG_REMOVE = 4;
+
+ private byte[] mapId;
+ private int msgtype;
+ private boolean diff;
+ private Serializable key;
+ private Serializable value;
+ private byte[] diffvalue;
+ private Member node;
+
+ public MapMessage(byte[] mapId,
+ int msgtype, boolean diff,
+ Serializable key,Serializable value,
+ byte[] diffvalue, Member node) {
+ this.mapId = mapId;
+ this.msgtype = msgtype;
+ this.diff = diff;
+ this.key = key;
+ this.value = value;
+ this.diffvalue = diffvalue;
+ this.node = node;
+ }
+
+ public int getMsgType() {
+ return msgtype;
+ }
+
+ public boolean isDiff() {
+ return diff;
+ }
+
+ public Serializable getKey() {
+ return key;
+ }
+
+ public Serializable getValue() {
+ return value;
+ }
+
+ public byte[] getDiffValue() {
+ return diffvalue;
+ }
+
+ public Member getBackupNode() {
+ return node;
+ }
+
+ public byte[] getMapId() {
+ return mapId;
+ }
+
+ public void setValue(Serializable value) {
+ this.value = value;
+ }
+
+ public void readExternal(ObjectInput in) throws
IOException,ClassNotFoundException {
+ mapId = new byte[in.readInt()];
+ in.read(mapId);
+ msgtype = in.readInt();
+ switch (msgtype) {
+ case MSG_BACKUP: {
+ diff = in.readBoolean();
+ key = (Serializable)in.readObject();
+ if ( diff ) {
+ diffvalue = new byte[in.readInt()];
+ in.read(diffvalue);
+ } else {
+ value = (Serializable)in.readObject();
+ }//endif
+ break;
+ }
+ case MSG_RETRIEVE_BACKUP:
+ case MSG_REMOVE : {
+ key = (Serializable)in.readObject();
+ break;
+ }
+ case MSG_PROXY: {
+ key = (Serializable)in.readObject();
+ byte[] d = new byte[in.readInt()];
+ in.read(d);
+ node = McastMember.getMember(d);
+ break;
+ }
+ }//switch
+ }//readExternal
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(mapId.length);
+ out.write(mapId);
+ out.writeInt(msgtype);
+ switch (msgtype) {
+ case MSG_BACKUP: {
+ out.writeBoolean(diff);
+ out.writeObject(key);
+ if ( diff ) {
+ out.writeInt(diffvalue.length);
+ out.write(diffvalue);
+ } else {
+ out.writeObject(value);
+ }//endif
+ break;
+ }
+ case MSG_RETRIEVE_BACKUP:
+ case MSG_REMOVE : {
+ out.writeObject(key);
+ break;
+ }
+ case MSG_PROXY: {
+ out.writeObject(key);
+ byte[] d = ((McastMember)node).getData(false);
+ out.writeInt(d.length);
+ out.write(d);
+ break;
+ }
+ }//switch
+ }//writeExternal
+ }//MapMessage
//------------------------------------------------------------------------------
// streamable class
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384676&r1=384675&r2=384676&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Thu Mar 9 18:04:49 2006
@@ -74,7 +74,7 @@
public Response[] send(Member[] destination,
Serializable message,
int options,
- long timeout) throws ChannelException,
InterruptedException {
+ long timeout) throws ChannelException {
if ( destination==null || destination.length == 0 ) return new
Response[0];
RpcCollectorKey key = new
RpcCollectorKey(UUIDGenerator.randomUUID(false));
@@ -86,6 +86,8 @@
channel.send(destination, rmsg);
collector.wait(timeout);
}
+ } catch ( InterruptedException ix ) {
+ throw new ChannelException(ix);
}finally {
responseMap.remove(key);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=384676&r1=384675&r2=384676&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Mar 9 18:04:49 2006
@@ -54,6 +54,12 @@
15. remove DataSenderFactory and DataSender.properties -
these cause the settings to be hard coded ant not pluggable.
+
+16. Guaranteed delivery of messages, ie either all get it or none get it.
+ Meaning, that all receivers get it, then wait for a process command.
+
+17. Implement transactions - the ability to start a transaction, send several
messages,
+ and then commit the transaction
Tasks Completed
===========================================
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]