Author: fhanik Date: Tue Mar 7 13:46:32 2006 New Revision: 384023 URL: http://svn.apache.org/viewcvs?rev=384023&view=rev Log: Finished the Rpc channel and a sample demo
Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384023&r1=384022&r2=384023&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Tue Mar 7 13:46:32 2006 @@ -35,6 +35,7 @@ * @author Filip Hanik */ public class RpcChannel implements ChannelListener{ + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class); public static final int FIRST_REPLY = 1; public static final int MAJORITY_REPLY = 2; @@ -57,7 +58,7 @@ this.channel = channel; this.callback = callback; this.rpcId = rpcId; - //channel.addChannelListener(this); + channel.addChannelListener(this); } @@ -75,6 +76,7 @@ int options, long timeout) throws ChannelException, InterruptedException { + if ( destination==null || destination.length == 0 ) return new Response[0]; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); RpcCollector collector = new RpcCollector(key,options,destination.length,timeout); synchronized (collector) { @@ -90,16 +92,26 @@ public void messageReceived(Serializable msg, Member sender) { RpcMessage rmsg = (RpcMessage)msg; RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); - RpcCollector collector = (RpcCollector)responseMap.get(key); - if ( collector == null ) { - callback.leftOver(rmsg.message,sender); - } else { - synchronized (collector) { - collector.addResponse(rmsg.message,sender); - if ( collector.isComplete() ) collector.notifyAll(); + if ( rmsg.reply ) { + RpcCollector collector = (RpcCollector)responseMap.get(key); + if (collector == null) { + callback.leftOver(rmsg.message, sender); + } else { + synchronized (collector) { + collector.addResponse(rmsg.message, sender); + if (collector.isComplete()) collector.notifyAll(); + }//synchronized + }//end if + } else{ + Serializable reply = callback.replyRequest(rmsg.message,sender); + rmsg.reply = true; + rmsg.message = reply; + try { + channel.send(new Member[] {sender}, rmsg); + }catch ( Exception x ) { + log.error("Unable to send back reply in RpcChannel.",x); } - } - + }//end if } public boolean accept(Serializable msg, Member sender) { @@ -138,6 +150,11 @@ private Serializable message; private byte[] uuid; private byte[] rpcId; + private boolean reply = false; + + public RpcMessage() { + //for serialization + } public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { this.rpcId = rpcId; @@ -146,6 +163,7 @@ } public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + reply = in.readBoolean(); int length = in.readInt(); uuid = new byte[length]; in.read(uuid, 0, length); @@ -156,6 +174,7 @@ } public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(reply); out.writeInt(uuid.length); out.write(uuid, 0, uuid.length); out.writeInt(rpcId.length); Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java?rev=384023&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Tue Mar 7 13:46:32 2006 @@ -0,0 +1,202 @@ +package org.apache.catalina.tribes.demos; + +import java.io.Serializable; + +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.tipis.RpcCallback; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.tipis.RpcChannel; +import org.apache.catalina.tribes.tipis.Response; + + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class EchoRpcTest implements RpcCallback, Runnable { + + Channel channel; + int count; + String message; + long pause; + RpcChannel rpc; + int options; + long timeout; + + public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) { + this.channel = channel; + this.count = count; + this.message = message; + this.pause = pause; + this.options = options; + this.rpc = new RpcChannel(name.getBytes(),channel,this); + this.timeout = timeout; + } + + /** + * If the reply has already been sent to the requesting thread, the rpc + * callback can handle any data that comes in after the fact. + * + * @param msg Serializable + * @param sender Member + * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback + * method + */ + public void leftOver(Serializable msg, Member sender) { + System.out.println("Received a left over message from ["+sender.getName()+"] with data ["+msg+"]"); + } + + /** + * + * @param msg Serializable + * @param sender Member + * @return Serializable - null if no reply should be sent + * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback + * method + */ + public Serializable replyRequest(Serializable msg, Member sender) { + System.out.println("Received a reply request message from ["+sender.getName()+"] with data ["+msg+"]"); + return "Reply:"+msg; + } + + public void run() { + long counter = 0; + while (counter<count) { + String msg = message + " cnt="+(++counter); + try { + System.out.println("Sending ["+msg+"]"); + long start = System.currentTimeMillis(); + Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,timeout); + System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms."); + for ( int i=0; i<resp.length; i++ ) { + System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]"); + } + Thread.sleep(pause); + }catch(Exception x){} + } + } + + public static void usage() { + System.out.println("Tribes RPC tester."); + System.out.println("Usage:\n\t"+ + "java EchoRpcTest [options]\n\t"+ + "Options:\n\t\t"+ + "[-mode all|first|majority] \n\t\t"+ + "[-debug] \n\t\t"+ + "[-count messagecount] \n\t\t"+ + "[-timeout timeoutinms] \n\t\t"+ + "[-stats statinterval] \n\t\t"+ + "[-pause nrofsecondstopausebetweensends] \n\t\t"+ + "[-message message] \n\t\t"+ + "[-name rpcname] \n\t\t"+ + "[-break (halts execution on exception)]\n"+ + "\tChannel options:"+ + ChannelCreator.usage()+"\n\n"+ + "Example:\n\t"+ + "java EchoRpcTest -port 4004\n\t"+ + "java EchoRpcTest -bind 192.168.0.45 -port 4005\n\t"+ + "java EchoRpcTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n"); + } + + public static void main(String[] args) throws Exception { + boolean send = true; + boolean debug = false; + long pause = 3000; + int count = 1000000; + int stats = 10000; + String name = "EchoRpcId"; + boolean breakOnEx = false; + int threads = 1; + int options = RpcChannel.ALL_REPLY; + long timeout = 15000; + String message = "EchoRpcMessage"; + if ( args.length == 0 ) { + args = new String[] {"-help"}; + } + for (int i = 0; i < args.length; i++) { + if ("-threads".equals(args[i])) { + threads = Integer.parseInt(args[++i]); + } else if ("-count".equals(args[i])) { + count = Integer.parseInt(args[++i]); + System.out.println("Sending "+count+" messages."); + } else if ("-pause".equals(args[i])) { + pause = Long.parseLong(args[++i])*1000; + } else if ("-break".equals(args[i])) { + breakOnEx = true; + } else if ("-stats".equals(args[i])) { + stats = Integer.parseInt(args[++i]); + System.out.println("Stats every "+stats+" message"); + } else if ("-timeout".equals(args[i])) { + timeout = Long.parseLong(args[++i]); + } else if ("-message".equals(args[i])) { + message = args[++i]; + } else if ("-name".equals(args[i])) { + name = args[++i]; + } else if ("-mode".equals(args[i])) { + if ( "all".equals(args[++i]) ) options = RpcChannel.ALL_REPLY; + else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY; + else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY; + } else if ("-debug".equals(args[i])) { + debug = true; + } else if ("-help".equals(args[i])) + { + usage(); + System.exit(1); + } + } + + + ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args); + EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout); + channel.start(channel.DEFAULT); + Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); + test.run(); + + System.out.println("System test complete, sleeping to let threads finish."); + Thread.sleep(60*1000*60); + } + + public static class Shutdown extends Thread { + ManagedChannel channel = null; + public Shutdown(ManagedChannel channel) { + this.channel = channel; + } + + public void run() { + System.out.println("Shutting down..."); + SystemExit exit = new SystemExit(5000); + exit.setDaemon(true); + exit.start(); + try { + channel.stop(channel.DEFAULT); + + }catch ( Exception x ) { + x.printStackTrace(); + } + System.out.println("Channel stopped."); + } + } + public static class SystemExit extends Thread { + private long delay; + public SystemExit(long delay) { + this.delay = delay; + } + public void run () { + try { + Thread.sleep(delay); + }catch ( Exception x ) { + x.printStackTrace(); + } + System.exit(0); + + } + }} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]