Author: fhanik Date: Tue Mar 21 08:07:12 2006 New Revision: 387560 URL: http://svn.apache.org/viewcvs?rev=387560&view=rev Log: Added documentation for the ReplicatedMapEntry and added the async replication to the demos
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java Tue Mar 21 08:07:12 2006 @@ -72,6 +72,15 @@ public int getOptions(); public void setOptions(int options); + /** + * Shallow clone, only the actual message(getMessage()) is cloned, the rest remains as references + * @return ChannelMessage + */ public ChannelMessage clone(); + /** + * Deep clone, everything gets cloned + * @return ChannelMessage + */ + public ChannelMessage deepclone(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Tue Mar 21 08:07:12 2006 @@ -15,6 +15,8 @@ package org.apache.catalina.tribes.group.interceptors; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; @@ -23,7 +25,6 @@ import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.tcp.bio.util.FastQueue; import org.apache.catalina.tribes.tcp.bio.util.LinkObject; -import java.util.concurrent.atomic.AtomicLong; /** * @@ -44,22 +45,23 @@ private boolean run = false; private Thread msgDispatchThread = null; private AtomicLong currentSize = new AtomicLong(0); - + private boolean useDeepClone = false; public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS; if ( async && run ) { if ( (currentSize.get()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+currentSize+" bytes."); //add to queue - queue.add(msg, destination, payload); + if ( useDeepClone ) msg = msg.deepclone(); + if (!queue.add(msg, destination, payload) ) { + throw new ChannelException("Unable to add the message to the async queue, queue bug?"); + } currentSize.addAndGet(msg.getMessage().getLength()); } else { super.sendMessage(destination, msg, payload); } } - - public void messageReceived(ChannelMessage msg) { super.messageReceived(msg); } @@ -78,10 +80,18 @@ this.maxQueueSize = maxQueueSize; } + public void setUseDeepClone(boolean useDeepClone) { + this.useDeepClone = useDeepClone; + } + public long getMaxQueueSize() { return maxQueueSize; } - + + public boolean getUseDeepClone() { + return useDeepClone; + } + public void start(int svc) throws ChannelException { //start the thread if (!run ) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java Tue Mar 21 08:07:12 2006 @@ -174,6 +174,7 @@ enabled = enable; if (!enabled) { lock.abortRemove(); + last = first = null; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Mar 21 08:07:12 2006 @@ -55,6 +55,16 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener { protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); + /** + * The default initial capacity - MUST be a power of two. + */ + public static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The load factor used when none specified in constructor. + **/ + public static final float DEFAULT_LOAD_FACTOR = 0.75f; + //------------------------------------------------------------------------------ // INSTANCE VARIABLES //------------------------------------------------------------------------------ @@ -65,8 +75,8 @@ private transient boolean stateTransferred = false; private transient Object stateMutex = new Object(); private transient ArrayList mapMembers = new ArrayList(); - private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; + private transient Object mapOwner; //------------------------------------------------------------------------------ // CONSTRUCTORS @@ -80,52 +90,24 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public AbstractReplicatedMap(Channel channel, + public AbstractReplicatedMap(Object owner, + Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, int channelSendOptions) { super(initialCapacity, loadFactor); - init(channel, mapContextName, timeout, channelSendOptions); + init(owner, channel, mapContextName, timeout, channelSendOptions); } - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - */ - public AbstractReplicatedMap(Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - int channelSendOptions) { - super(initialCapacity); - init(channel, mapContextName, timeout, channelSendOptions); - } - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - */ - public AbstractReplicatedMap(Channel channel, - long timeout, - String mapContextName, - int channelSendOptions) { - super(); - init(channel, mapContextName, timeout, channelSendOptions); - } - protected Member[] wrap(Member m) { return new Member[] {m}; } - private void init(Channel channel, String mapContextName, long timeout, int channelSendOptions) { + private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) { + this.mapOwner = owner; final String chset = "ISO-8859-1"; this.channelSendOptions = channelSendOptions; this.channel = channel; @@ -277,9 +259,12 @@ //make sure we don't store that actual object as primary or backup MapEntry local = (MapEntry)super.get(m.getKey()); - if (local != null && (!local.isProxy()))continue; + if (local != null && (!local.isProxy())) continue; //store the object + if (m.getValue()!=null && m.getValue() instanceof ReplicatedMapEntry ) { + ((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner()); + } MapEntry entry = new MapEntry(m.getKey(), m.getValue()); entry.setBackup(false); entry.setProxy(true); @@ -390,6 +375,9 @@ entry.setBackup(true); entry.setProxy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); + if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { + ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); + } super.put(entry.getKey(), entry); } else { entry.setBackup(true); @@ -400,7 +388,7 @@ if (mapmsg.isDiff()) { try { diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); - } catch (IOException x) { + } catch (Exception x) { log.error("Unable to apply diff to key:" + entry.getKey(), x); } } else { @@ -865,6 +853,14 @@ public boolean isStateTransferred() { return stateTransferred; + } + + public Object getMapOwner() { + return mapOwner; + } + + public void setMapOwner(Object mapOwner) { + this.mapOwner = mapOwner; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Tue Mar 21 08:07:12 2006 @@ -84,8 +84,8 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) { - super(channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT); + public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) { + super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT); } /** @@ -95,8 +95,8 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ - public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) { - super(channel,timeout,mapContextName,initialCapacity, Channel.SEND_OPTIONS_DEFAULT); + public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { + super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT); } /** @@ -105,8 +105,8 @@ * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ - public LazyReplicatedMap(Channel channel, long timeout, String mapContextName) { - super(channel,timeout,mapContextName, Channel.SEND_OPTIONS_DEFAULT); + public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) { + super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Mar 21 08:07:12 2006 @@ -66,9 +66,8 @@ * @param initialCapacity int - the size of this map, see HashMap * @param loadFactor float - load factor, see HashMap */ - public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, - float loadFactor) { - super(channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT); + public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) { + super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT); } /** @@ -78,8 +77,8 @@ * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap */ - public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) { - super(channel, timeout, mapContextName, initialCapacity, Channel.SEND_OPTIONS_DEFAULT); + public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) { + super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT); } /** @@ -88,8 +87,8 @@ * @param timeout long - timeout for RPC messags * @param mapContextName String - unique name for this map, to allow multiple maps per channel */ - public ReplicatedMap(Channel channel, long timeout, String mapContextName) { - super(channel, timeout, mapContextName, Channel.SEND_OPTIONS_DEFAULT); + public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) { + super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT); } //------------------------------------------------------------------------------ Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java Tue Mar 21 08:07:12 2006 @@ -15,13 +15,34 @@ */ package org.apache.catalina.tribes.tipis; -import java.io.Serializable; import java.io.IOException; -import org.apache.catalina.tribes.tcp.*; +import java.io.Serializable; /** * - * For smarter replication, an object can implement this interface to replicate diffs + * For smarter replication, an object can implement this interface to replicate diffs<br> + * The replication logic will call the methods in the following order:<br> + * <code> + * 1. if ( entry.isDirty() ) <br> + * try { + * 2. entry.lock();<br> + * 3. byte[] diff = entry.getDiff();<br> + * 4. entry.reset();<br> + * } finally {<br> + * 5. entry.unlock();<br> + * }<br> + * }<br> + * </code> + * <br> + * <br> + * When the data is deserialized the logic is called in the following order<br> + * <code> + * 1. ReplicatedMapEntry entry = (ReplicatedMapEntry)objectIn.readObject();<br> + * 2. if ( isBackup(entry)||isPrimary(entry) ) entry.setOwner(owner); <br> + * </code> + * <br> + * + * * @author Filip Hanik * @version 1.0 */ @@ -34,8 +55,6 @@ */ public boolean isDirty(); - public boolean setDirty(boolean dirty); - /** * If this returns true, the map will extract the diff using getDiff() * Otherwise it will serialize the entire object. @@ -58,7 +77,33 @@ * @param length int * @throws IOException */ - public void applyDiff(byte[] diff, int offset, int length) throws IOException; + public void applyDiff(byte[] diff, int offset, int length) throws IOException, ClassNotFoundException; + + /** + * Resets the current diff state and resets the dirty flag + */ + public void resetDiff(); + + /** + * Lock during serialization + */ + public void lock(); + + /** + * Unlock after serialization + */ + public void unlock(); + + /** + * This method is called after the object has been + * created on a remote map. On this method, + * the object can initialize itself for any data that wasn't + * + * @param owner Object + */ + public void setOwner(Object owner); + + } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Mar 21 08:07:12 2006 @@ -30,6 +30,7 @@ import org.apache.catalina.tribes.tcp.ReplicationTransmitter; import org.apache.tomcat.util.IntrospectionUtils; import org.apache.catalina.tribes.tcp.*; +import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; /** * <p>Title: </p> @@ -64,7 +65,9 @@ .append("\n\t\t[-order]") .append("\n\t\t[-ordersize maxorderqueuesize]") .append("\n\t\t[-frag]") - .append("\n\t\t[-fragsize maxmsgsize]"); + .append("\n\t\t[-fragsize maxmsgsize]") + .append("\n\t\t[-async]") + .append("\n\t\t[-asyncsize maxqueuesizeinbytes]"); return buf; } @@ -89,6 +92,8 @@ Properties transportProperties = new Properties(); String transport = "org.apache.catalina.tribes.tcp.nio.PooledParallelSender"; String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver"; + boolean async = false; + int asyncsize = 1024*1024*50; //50MB for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { @@ -103,6 +108,11 @@ tcpthreadcount = Integer.parseInt(args[++i]); } else if ("-gzip".equals(args[i])) { gzip = true; + } else if ("-async".equals(args[i])) { + async = true; + } else if ("-asyncsize".equals(args[i])) { + asyncsize = Integer.parseInt(args[++i]); + System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize); } else if ("-order".equals(args[i])) { order = true; } else if ("-ordersize".equals(args[i])) { @@ -187,6 +197,13 @@ oi.setMaxQueue(ordersize); channel.addInterceptor(oi); } + + if ( async ) { + MessageDispatchInterceptor mi = new MessageDispatchInterceptor(); + mi.setMaxQueueSize(asyncsize); + channel.addInterceptor(mi); + } + return channel; } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Tue Mar 21 08:07:12 2006 @@ -55,6 +55,7 @@ public int statsInterval = 10000; public long pause = 0; public boolean breakonChannelException = false; + public boolean async = false; public long receiveStart = 0; public int channelOptions = Channel.SEND_OPTIONS_DEFAULT; @@ -144,7 +145,7 @@ Thread.sleep(pause); } } catch (ChannelException x) { - log.error("Unable to send message."); + log.error("Unable to send message:"+x.getMessage()); Member[] faulty = x.getFaultyMembers(); for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]); --counter; Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 08:07:12 2006 @@ -45,7 +45,7 @@ protected SimpleTableDemo table; public MapDemo(Channel channel ) { - map = new LazyReplicatedMap(channel,5000, "MapDemo"); + map = new LazyReplicatedMap(this,channel,5000, "MapDemo"); table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName()); channel.addChannelListener(this); channel.addMembershipListener(this); Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=387560&r1=387559&r2=387560&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue Mar 21 08:07:12 2006 @@ -29,6 +29,12 @@ Code Tasks: =========================================== +32. Replicated JNDI entries in Tomcat in the format + cluster:<map name>/<entry key> for example + cluster:myapps/db/shared/dbinfo + +31. A layer on top of the GroupChannel, to allow multiple processes share + a channel to send/receive message to conserve system resources - this is way in the future. 30. CookieBasedReplicationMap - a very simple extension to the LazyReplicatedMap but instead of randomly selecting a backup node and then publishing the PROXY to all @@ -50,17 +56,6 @@ 25. Member.uniqueId - 16 bytes unique for a member, UUID Needed to not confuse a crashed member with a revived member on the same port -24. MessageDispatchInterceptor - for asynchronous sending - - looks at the options flag SEND_OPTIONS_ASYNCHRONOUS - - has two modes - a) async parallel send - each message to all destinations before next message - b) async per/member - one thread per member using the FastAsyncQueue (good for groups with slow receivers) - - (optional)persistent - writes messages to persistent store first, then starts processing - - Callback error handler - for when messages fail, and the application wishes to become notified - - MUST HAVE A LIMIT QUEUE SIZE IN MB, to avoid OOM errors or persist the queue. - - MUST USE ClusterData.deepclone() to ensure thread safety if ClusterData objects get recycled - - 23. TotalOrderInterceptor - fairly straight forward implementation This interceptor would depend on the fact that there is some sort of membership coordinator, see task 9. @@ -131,7 +126,8 @@ 16. Guaranteed delivery of messages, ie either all get it or none get it. Meaning, that all receivers get it, then wait for a process command. - ala Gossip protocol + ala Gossip protocol - this is fairly redundant with a Xa2PhaseCommitInterceptor + except it doesn't keep a transaction log. 17. Implement transactions - the ability to start a transaction, send several messages, and then commit the transaction @@ -192,3 +188,14 @@ Notes: see Channel.SEND_OPT_XXXX variables 28. Thread pool should have maxThreads and minThreads and grow dynamically + +24. MessageDispatchInterceptor - for asynchronous sending + - looks at the options flag SEND_OPTIONS_ASYNCHRONOUS + - has two modes + a) async parallel send - each message to all destinations before next message + b) async per/member - one thread per member using the FastAsyncQueue (good for groups with slow receivers) + - Callback error handler - for when messages fail, and the application wishes to become notified + - MUST HAVE A LIMIT QUEUE SIZE IN MB, to avoid OOM errors or persist the queue. + - MUST USE ClusterData.deepclone() to ensure thread safety if ClusterData objects get recycled +Notes: Simple implementation, one thread, invokes all senders in parallel. + Deep cloning is configurable as optimization. --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]