Author: fhanik Date: Fri Apr 14 10:24:08 2006 New Revision: 394147 URL: http://svn.apache.org/viewcvs?rev=394147&view=rev Log: Moved RpcChannel to the group implementation
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=394147&r1=394146&r2=394147&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Apr 14 10:24:08 2006 @@ -34,6 +34,11 @@ import org.apache.catalina.tribes.MembershipService; import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; +import java.io.ObjectInput; +import java.io.Externalizable; + +import java.io.IOException; +import java.io.ObjectOutput; /** * The GroupChannel manages the replication channel. It coordinates @@ -286,5 +291,14 @@ public boolean getOptionCheck() { return optionCheck; } + + + public static class NoChannelReply extends RpcMessage { + public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + } + + public void writeExternal(ObjectOutput out) throws IOException { + } + } } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=394147&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java Fri Apr 14 10:24:08 2006 @@ -0,0 +1,251 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.util.UUIDGenerator; +import org.apache.catalina.tribes.tipis.*; + +/** + * A channel to handle RPC messaging + * @author Filip Hanik + */ +public class RpcChannel implements ChannelListener{ + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class); + + public static final int FIRST_REPLY = 1; + public static final int MAJORITY_REPLY = 2; + public static final int ALL_REPLY = 3; + + private Channel channel; + private RpcCallback callback; + private byte[] rpcId; + + private HashMap responseMap = new HashMap(); + + /** + * Create an RPC channel. You can have several RPC channels attached to a group + * all separated out by the uniqueness + * @param rpcId - the unique Id for this RPC group + * @param channel Channel + * @param callback RpcCallback + */ + public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { + this.channel = channel; + this.callback = callback; + this.rpcId = rpcId; + channel.addChannelListener(this); + } + + + /** + * Send a message and wait for the response. + * @param destination Member[] - the destination for the message, and the members you request a reply from + * @param message Serializable - the message you are sending out + * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY + * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned + * @return Response[] - an array of response objects. + * @throws ChannelException + */ + public Response[] send(Member[] destination, + Serializable message, + int rpcOptions, + int channelOptions, + long timeout) throws ChannelException { + + if ( destination==null || destination.length == 0 ) return new Response[0]; + + //avoid dead lock + channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; + + RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); + RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout); + try { + synchronized (collector) { + responseMap.put(key, collector); + RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); + channel.send(destination, rmsg, channelOptions); + collector.wait(timeout); + } + } catch ( InterruptedException ix ) { + Thread.currentThread().interrupted(); + throw new ChannelException(ix); + }finally { + responseMap.remove(key); + } + return collector.getResponses(); + } + + public void messageReceived(Serializable msg, Member sender) { + RpcMessage rmsg = (RpcMessage)msg; + RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); + if ( rmsg.reply ) { + RpcCollector collector = (RpcCollector)responseMap.get(key); + if (collector == null) { + callback.leftOver(rmsg.message, sender); + } else { + synchronized (collector) { + //make sure it hasn't been removed + if ( responseMap.containsKey(key) ) { + collector.addResponse(rmsg.message, sender); + if (collector.isComplete()) collector.notifyAll(); + } else { + callback.leftOver(rmsg.message, sender); + } + }//synchronized + }//end if + } else{ + Serializable reply = callback.replyRequest(rmsg.message,sender); + rmsg.reply = true; + rmsg.message = reply; + try { + channel.send(new Member[] {sender}, rmsg,0); + }catch ( Exception x ) { + log.error("Unable to send back reply in RpcChannel.",x); + } + }//end if + } + + public void breakdown() { + channel.removeChannelListener(this); + } + + public void finalize() { + breakdown(); + } + + public boolean accept(Serializable msg, Member sender) { + if ( msg instanceof RpcMessage ) { + RpcMessage rmsg = (RpcMessage)msg; + return Arrays.equals(rmsg.rpcId,rpcId); + }else return false; + } + + public Channel getChannel() { + return channel; + } + + public RpcCallback getCallback() { + return callback; + } + + public byte[] getRpcId() { + return rpcId; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + + public void setCallback(RpcCallback callback) { + this.callback = callback; + } + + public void setRpcId(byte[] rpcId) { + this.rpcId = rpcId; + } + + + + /** + * + * Class that holds all response. + * @author not attributable + * @version 1.0 + */ + public static class RpcCollector { + public ArrayList responses = new ArrayList(); + public RpcCollectorKey key; + public int options; + public int destcnt; + public long timeout; + + public RpcCollector(RpcCollectorKey key, int options, int destcnt, long timeout) { + this.key = key; + this.options = options; + this.destcnt = destcnt; + this.timeout = timeout; + } + + public void addResponse(Serializable message, Member sender){ + Response resp = new Response(sender,message); + responses.add(resp); + } + + public boolean isComplete() { + switch (options) { + case ALL_REPLY: + return destcnt == responses.size(); + case MAJORITY_REPLY: + { + float perc = ((float)responses.size()) / ((float)destcnt); + return perc >= 0.50f; + } + case FIRST_REPLY: + return responses.size()>0; + default: + return false; + } + } + + public int hashCode() { + return key.hashCode(); + } + + public boolean equals(Object o) { + if ( o instanceof RpcCollector ) { + RpcCollector r = (RpcCollector)o; + return r.key.equals(this.key); + } else return false; + } + + public Response[] getResponses() { + return (Response[])responses.toArray(new Response[responses.size()]); + } + } + + public static class RpcCollectorKey { + byte[] id; + public RpcCollectorKey(byte[] id) { + this.id = id; + } + + public int hashCode() { + return id[0]+id[1]+id[2]+id[3]; + } + + public boolean equals(Object o) { + if ( o instanceof RpcCollectorKey ) { + RpcCollectorKey r = (RpcCollectorKey)o; + return Arrays.equals(id,r.id); + } else return false; + } + + } + +} \ No newline at end of file Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java?rev=394147&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java Fri Apr 14 10:24:08 2006 @@ -0,0 +1,74 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.group; + +import java.io.ObjectInput; +import java.io.Serializable; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectOutput; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class RpcMessage implements Externalizable { + + protected Serializable message; + protected byte[] uuid; + protected byte[] rpcId; + protected boolean reply = false; + + public RpcMessage() { + //for serialization + } + + public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { + this.rpcId = rpcId; + this.uuid = uuid; + this.message = message; + } + + public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + reply = in.readBoolean(); + int length = in.readInt(); + uuid = new byte[length]; + in.read(uuid, 0, length); + length = in.readInt(); + rpcId = new byte[length]; + in.read(rpcId, 0, length); + message = (Serializable)in.readObject(); + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(reply); + out.writeInt(uuid.length); + out.write(uuid, 0, uuid.length); + out.writeInt(rpcId.length); + out.write(rpcId, 0, rpcId.length); + out.writeObject(message); + } + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=394147&r1=394146&r2=394147&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Fri Apr 14 10:24:08 2006 @@ -37,6 +37,7 @@ import org.apache.catalina.tribes.membership.MemberImpl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.catalina.tribes.group.*; /** * <p>Title: </p> Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=394147&r1=394146&r2=394147&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Apr 14 10:24:08 2006 @@ -29,6 +29,7 @@ import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.*; /** * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=394147&r1=394146&r2=394147&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Fri Apr 14 10:24:08 2006 @@ -33,7 +33,7 @@ import org.apache.catalina.tribes.transport.AbstractSender; import java.net.UnknownHostException; import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.tipis.RpcChannel; +import org.apache.catalina.tribes.group.RpcChannel; /** * <p>Title: </p> Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java?rev=394147&r1=394146&r2=394147&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Fri Apr 14 10:24:08 2006 @@ -6,7 +6,7 @@ import org.apache.catalina.tribes.tipis.RpcCallback; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ManagedChannel; -import org.apache.catalina.tribes.tipis.RpcChannel; +import org.apache.catalina.tribes.group.RpcChannel; import org.apache.catalina.tribes.tipis.Response; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]