Author: fhanik Date: Thu Mar 9 18:04:49 2006 New Revision: 384676 URL: http://svn.apache.org/viewcvs?rev=384676&view=rev Log: Working on the replicated map
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384676&r1=384675&r2=384676&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Mar 9 18:04:49 2006 @@ -30,14 +30,26 @@ import org.apache.catalina.tribes.ChannelListener; import java.util.Collection; import org.apache.catalina.tribes.MembershipListener; +import java.io.Externalizable; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.catalina.tribes.mcast.McastMember; +import java.util.Iterator; +import org.apache.catalina.tribes.ChannelException; +import java.util.LinkedList; +import java.util.LinkedHashSet; +import java.util.ArrayList; +import java.util.Arrays; /** + * @todo implement periodic sync/transfer * @author Filip Hanik * @version 1.0 */ public class LazyReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class); + protected static long TIME_OUT = 15000;//hard coded timeout //------------------------------------------------------------------------------ // INSTANCE VARIABLES @@ -45,6 +57,7 @@ private Channel channel; private RpcChannel rpcChannel; + private byte[] mapContextName; //------------------------------------------------------------------------------ @@ -69,13 +82,15 @@ final String chset = "ISO-8859-1"; this.channel = channel; try { - this.rpcChannel = new RpcChannel(mapContextName.getBytes(chset), channel, this); + this.mapContextName = mapContextName.getBytes(chset); }catch (UnsupportedEncodingException x) { log.warn("Unable to encode mapContextName["+mapContextName+"] using getBytes("+chset+") using default getBytes()",x); - this.rpcChannel = new RpcChannel(mapContextName.getBytes(), channel, this); + this.mapContextName = mapContextName.getBytes(); } + this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); this.channel.addChannelListener(this); this.channel.addMembershipListener(this); + transferState(); } @@ -93,18 +108,30 @@ } this.rpcChannel = null; this.channel = null; + super.clear(); } //------------------------------------------------------------------------------ // GROUP COM INTERFACES //------------------------------------------------------------------------------ + public void transferState() { + throw new UnsupportedOperationException(); + } + /** - * + * @todo implement state transfer * @param msg Serializable * @return Serializable - null if no reply should be sent */ public Serializable replyRequest(Serializable msg, Member sender) { - throw new UnsupportedOperationException(); + if ( !(msg instanceof MapMessage) ) return null; + MapMessage mapmsg = (MapMessage)msg; + if ( mapmsg.getMsgType() != mapmsg.MSG_RETRIEVE_BACKUP ) return null; + + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if ( entry == null ) return null; + mapmsg.setValue((Serializable)entry.getValue()); + return mapmsg; } /** @@ -114,30 +141,112 @@ * @param sender Member */ public void leftOver(Serializable msg, Member sender) { - throw new UnsupportedOperationException(); + //ignore left over responses } public void messageReceived(Serializable msg, Member sender) { throw new UnsupportedOperationException(); + //todo implement all the messages that we can receive } public boolean accept(Serializable msg, Member sender) { - throw new UnsupportedOperationException(); + if ( msg instanceof MapMessage ) { + return Arrays.equals(mapContextName,((MapMessage)msg).getMapId()); + } + return false; } public void memberAdded(Member member) { - + //do nothing, we don't care } public void memberDisappeared(Member member) { - + //todo move all sessions that are primary here to and have this member as + //a backup + Iterator i = super.entrySet().iterator(); + while ( i.hasNext() ) { + Map.Entry e = (Map.Entry)i.next(); + MapEntry entry = (MapEntry)e.getValue(); + if ( entry.isPrimary() && member.equals(entry.getBackupNode())) { + try { + Member backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNode(backup); + }catch ( ChannelException x ) { + log.error("Unable to relocate["+entry.getKey()+"] to a new backup node",x); + } + }//end if + }//while + } + + int currentNode = 0; + public Member getNextBackupNode() { + Member[] members = channel.getMembers(); + if ( members.length == 0 ) return null; + int node = currentNode++; + if ( node >= members.length ) { + node = 0; + currentNode = 0; + } + return members[node]; } + + //------------------------------------------------------------------------------ // METHODS TO OVERRIDE //------------------------------------------------------------------------------ - + /** + * publish info about a map pair (key/value) to other nodes in the cluster + * @param key Object + * @param value Object + * @return Member + * @throws ChannelException + */ + protected Member publishEntryInfo(Object key, Object value) throws ChannelException { + //select a backup node + Member backup = getNextBackupNode(); + //publish the data out to all nodes + MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PROXY, false, + (Serializable) key, null, null, backup); + channel.send(channel.getMembers(), msg); + + //publish the backup data to one node + msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false, + (Serializable) key, (Serializable) value, null, backup); + channel.send(new Member[] {backup}, msg); + return backup; + } + public Object get(Object key) { - return super.get(key); + MapEntry entry = (MapEntry)super.get(key); + if ( entry == null ) return null; + if ( !entry.isPrimary() ) { + try { + MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_RETRIEVE_BACKUP, false, + (Serializable) key, null, null, null); + Response[] resp = rpcChannel.send(new Member[] {entry.getBackupNode()}, + msg, this.rpcChannel.FIRST_REPLY, TIME_OUT); + if (resp == null || resp.length == 0) { + //no responses + log.warn("Unable to retrieve object for key:" + key); + return null; + } + msg = (MapMessage) resp[0].getMessage(); + + Member backup = entry.getBackupNode(); + if (entry.isBackup()) { + //select a new backup node + backup = publishEntryInfo(key, msg.getValue()); + } + entry.setBackupNode(backup); + entry.setBackup(false); + entry.setProxy(false); + entry.setValue(msg.getValue()); + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); + return null; + } + } + return entry.getValue(); } public boolean containsKey(Object key) { @@ -145,19 +254,53 @@ } public Object put(Object key, Object value) { - return super.put(key,value); + if ( !(key instanceof Serializable) ) throw new IllegalArgumentException("Key is not serializable:"+key.getClass().getName()); + if ( value == null ) return remove(key); + if ( !(value instanceof Serializable) ) throw new IllegalArgumentException("Value is not serializable:"+value.getClass().getName()); + + MapEntry entry = new MapEntry((Serializable)key,(Serializable)value); + entry.setBackup(false); + entry.setProxy(false); + + Object old = null; + + //make sure that any old values get removed + if ( containsKey(key) ) old = (MapEntry)remove(key); + try { + Member backup = publishEntryInfo(key, value); + entry.setBackupNode(backup); + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); + } + super.put(key,entry); + return old; } + + public void putAll(Map m) { - super.putAll(m); + Iterator i = m.entrySet().iterator(); + while ( i.hasNext() ) { + Map.Entry entry = (Map.Entry)i.next(); + put(entry.getKey(),entry.getValue()); + } } public Object remove(Object key) { - return super.remove(key); + MapEntry entry = (MapEntry)super.remove(key); + MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null); + try { + channel.send(channel.getMembers(), msg); + } catch ( ChannelException x ) { + log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x); + } + return entry!=null?entry.getValue():null; } public void clear() { - super.clear(); + //only delete active keys + Iterator keys = keySet().iterator(); + while ( keys.hasNext() ) remove(keys.next()); } public boolean containsValue(Object value) { @@ -165,19 +308,44 @@ } public Object clone() { - return super.clone(); + throw new UnsupportedOperationException("This operation is not valid on a replicated map"); } public Set entrySet() { - return super.entrySet(); + LinkedHashSet set = new LinkedHashSet(super.size()); + Iterator i = super.entrySet().iterator(); + while ( i.hasNext() ) { + Map.Entry e = (Map.Entry)i.next(); + MapEntry entry = (MapEntry)e.getValue(); + if ( entry.isPrimary() ) set.add(entry.getValue()); + } + return set; } public Set keySet() { - return super.keySet(); + //todo implement + //should only return keys where this is active. + LinkedHashSet set = new LinkedHashSet(super.size()); + Iterator i = super.entrySet().iterator(); + while ( i.hasNext() ) { + Map.Entry e = (Map.Entry)i.next(); + MapEntry entry = (MapEntry)e.getValue(); + if ( entry.isPrimary() ) set.add(entry.getKey()); + } + return set; } public int size() { - return super.size(); + //todo, implement a counter variable instead + //only count active members in this node + int counter = 0; + Iterator i = super.entrySet().iterator(); + while ( i.hasNext() ) { + Map.Entry e = (Map.Entry)i.next(); + MapEntry entry = (MapEntry)e.getValue(); + if ( entry.isPrimary() ) counter++; + } + return counter; } protected boolean removeEldestEntry(Map.Entry eldest) { @@ -185,11 +353,18 @@ } public boolean isEmpty() { - return super.isEmpty(); + return size()==0; } public Collection values() { - return super.values(); + ArrayList values = new ArrayList(super.size()); + Iterator i = super.entrySet().iterator(); + while ( i.hasNext() ) { + Map.Entry e = (Map.Entry)i.next(); + MapEntry entry = (MapEntry)e.getValue(); + if ( entry.isPrimary() ) values.add(entry.getValue()); + } + return values; } @@ -220,6 +395,10 @@ public boolean isProxy() { return proxy; } + + public boolean isPrimary() { + return ((!proxy) && (!backup)); + } public void setProxy(boolean proxy) { this.proxy = proxy; @@ -237,8 +416,6 @@ return backupNode; } - - public Object getValue() { return value; } @@ -300,6 +477,131 @@ } } +//------------------------------------------------------------------------------ +// map message to send to and from other maps +//------------------------------------------------------------------------------ + + public static class MapMessage implements Externalizable { + public static final int MSG_BACKUP = 1; + public static final int MSG_RETRIEVE_BACKUP = 2; + public static final int MSG_PROXY = 3; + public static final int MSG_REMOVE = 4; + + private byte[] mapId; + private int msgtype; + private boolean diff; + private Serializable key; + private Serializable value; + private byte[] diffvalue; + private Member node; + + public MapMessage(byte[] mapId, + int msgtype, boolean diff, + Serializable key,Serializable value, + byte[] diffvalue, Member node) { + this.mapId = mapId; + this.msgtype = msgtype; + this.diff = diff; + this.key = key; + this.value = value; + this.diffvalue = diffvalue; + this.node = node; + } + + public int getMsgType() { + return msgtype; + } + + public boolean isDiff() { + return diff; + } + + public Serializable getKey() { + return key; + } + + public Serializable getValue() { + return value; + } + + public byte[] getDiffValue() { + return diffvalue; + } + + public Member getBackupNode() { + return node; + } + + public byte[] getMapId() { + return mapId; + } + + public void setValue(Serializable value) { + this.value = value; + } + + public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + mapId = new byte[in.readInt()]; + in.read(mapId); + msgtype = in.readInt(); + switch (msgtype) { + case MSG_BACKUP: { + diff = in.readBoolean(); + key = (Serializable)in.readObject(); + if ( diff ) { + diffvalue = new byte[in.readInt()]; + in.read(diffvalue); + } else { + value = (Serializable)in.readObject(); + }//endif + break; + } + case MSG_RETRIEVE_BACKUP: + case MSG_REMOVE : { + key = (Serializable)in.readObject(); + break; + } + case MSG_PROXY: { + key = (Serializable)in.readObject(); + byte[] d = new byte[in.readInt()]; + in.read(d); + node = McastMember.getMember(d); + break; + } + }//switch + }//readExternal + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(mapId.length); + out.write(mapId); + out.writeInt(msgtype); + switch (msgtype) { + case MSG_BACKUP: { + out.writeBoolean(diff); + out.writeObject(key); + if ( diff ) { + out.writeInt(diffvalue.length); + out.write(diffvalue); + } else { + out.writeObject(value); + }//endif + break; + } + case MSG_RETRIEVE_BACKUP: + case MSG_REMOVE : { + out.writeObject(key); + break; + } + case MSG_PROXY: { + out.writeObject(key); + byte[] d = ((McastMember)node).getData(false); + out.writeInt(d.length); + out.write(d); + break; + } + }//switch + }//writeExternal + }//MapMessage //------------------------------------------------------------------------------ // streamable class Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384676&r1=384675&r2=384676&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Thu Mar 9 18:04:49 2006 @@ -74,7 +74,7 @@ public Response[] send(Member[] destination, Serializable message, int options, - long timeout) throws ChannelException, InterruptedException { + long timeout) throws ChannelException { if ( destination==null || destination.length == 0 ) return new Response[0]; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); @@ -86,6 +86,8 @@ channel.send(destination, rmsg); collector.wait(timeout); } + } catch ( InterruptedException ix ) { + throw new ChannelException(ix); }finally { responseMap.remove(key); } Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=384676&r1=384675&r2=384676&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Mar 9 18:04:49 2006 @@ -54,6 +54,12 @@ 15. remove DataSenderFactory and DataSender.properties - these cause the settings to be hard coded ant not pluggable. + +16. Guaranteed delivery of messages, ie either all get it or none get it. + Meaning, that all receivers get it, then wait for a process command. + +17. Implement transactions - the ability to start a transaction, send several messages, + and then commit the transaction Tasks Completed =========================================== --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]