Author: fhanik
Date: Fri Apr 14 10:24:08 2006
New Revision: 394147

URL: http://svn.apache.org/viewcvs?rev=394147&view=rev
Log:
Moved RpcChannel to the group implementation

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
Removed:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=394147&r1=394146&r2=394147&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Fri Apr 14 10:24:08 2006
@@ -34,6 +34,11 @@
 import org.apache.catalina.tribes.MembershipService;
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import java.io.ObjectInput;
+import java.io.Externalizable;
+
+import java.io.IOException;
+import java.io.ObjectOutput;
 
 /**
  * The GroupChannel manages the replication channel. It coordinates
@@ -286,5 +291,14 @@
     public boolean getOptionCheck() {
         return optionCheck;
     }
+    
+    
+    public static class NoChannelReply extends RpcMessage {
+        public void readExternal(ObjectInput in) throws 
IOException,ClassNotFoundException {
+        }
+
+        public void writeExternal(ObjectOutput out) throws IOException {
+        }
+    }    
 
 }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=394147&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
 Fri Apr 14 10:24:08 2006
@@ -0,0 +1,251 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.tipis.*;
+
+/**
+ * A channel to handle RPC messaging
+ * @author Filip Hanik
+ */
+public class RpcChannel implements ChannelListener{
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(RpcChannel.class);
+    
+    public static final int FIRST_REPLY = 1;
+    public static final int MAJORITY_REPLY = 2;
+    public static final int ALL_REPLY = 3;
+    
+    private Channel channel;
+    private RpcCallback callback;
+    private byte[] rpcId;
+    
+    private HashMap responseMap = new HashMap();
+
+    /**
+     * Create an RPC channel. You can have several RPC channels attached to a 
group
+     * all separated out by the uniqueness
+     * @param rpcId - the unique Id for this RPC group
+     * @param channel Channel
+     * @param callback RpcCallback
+     */
+    public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
+        this.channel = channel;
+        this.callback = callback;
+        this.rpcId = rpcId;
+        channel.addChannelListener(this);
+    }
+    
+    
+    /**
+     * Send a message and wait for the response.
+     * @param destination Member[] - the destination for the message, and the 
members you request a reply from
+     * @param message Serializable - the message you are sending out
+     * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
+     * @param timeout long - timeout in milliseconds, if no reply is received 
within this time null is returned
+     * @return Response[] - an array of response objects.
+     * @throws ChannelException
+     */
+    public Response[] send(Member[] destination, 
+                           Serializable message,
+                           int rpcOptions, 
+                           int channelOptions,
+                           long timeout) throws ChannelException {
+        
+        if ( destination==null || destination.length == 0 ) return new 
Response[0];
+        
+        //avoid dead lock
+        channelOptions = channelOptions & 
~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+        
+        RpcCollectorKey key = new 
RpcCollectorKey(UUIDGenerator.randomUUID(false));
+        RpcCollector collector = new 
RpcCollector(key,rpcOptions,destination.length,timeout);
+        try {
+            synchronized (collector) {
+                responseMap.put(key, collector);
+                RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
+                channel.send(destination, rmsg, channelOptions);
+                collector.wait(timeout);
+            }
+        } catch ( InterruptedException ix ) {
+            Thread.currentThread().interrupted();
+            throw new ChannelException(ix);
+        }finally {
+            responseMap.remove(key);
+        }
+        return collector.getResponses();
+    }
+    
+    public void messageReceived(Serializable msg, Member sender) {
+        RpcMessage rmsg = (RpcMessage)msg;
+        RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
+        if ( rmsg.reply ) {
+            RpcCollector collector = (RpcCollector)responseMap.get(key);
+            if (collector == null) {
+                callback.leftOver(rmsg.message, sender);
+            } else {
+                synchronized (collector) {
+                    //make sure it hasn't been removed
+                    if ( responseMap.containsKey(key) ) {
+                        collector.addResponse(rmsg.message, sender);
+                        if (collector.isComplete()) collector.notifyAll();
+                    } else {
+                        callback.leftOver(rmsg.message, sender);
+                    }
+                }//synchronized
+            }//end if
+        } else{
+            Serializable reply = callback.replyRequest(rmsg.message,sender);
+            rmsg.reply = true;
+            rmsg.message = reply;
+            try {
+                channel.send(new Member[] {sender}, rmsg,0);
+            }catch ( Exception x )  {
+                log.error("Unable to send back reply in RpcChannel.",x);
+            }
+        }//end if
+    }
+    
+    public void breakdown() {
+        channel.removeChannelListener(this);
+    }
+    
+    public void finalize() {
+        breakdown();
+    }
+    
+    public boolean accept(Serializable msg, Member sender) {
+        if ( msg instanceof RpcMessage ) {
+            RpcMessage rmsg = (RpcMessage)msg;
+            return Arrays.equals(rmsg.rpcId,rpcId);
+        }else return false;
+    }
+    
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public RpcCallback getCallback() {
+        return callback;
+    }
+
+    public byte[] getRpcId() {
+        return rpcId;
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+    public void setCallback(RpcCallback callback) {
+        this.callback = callback;
+    }
+
+    public void setRpcId(byte[] rpcId) {
+        this.rpcId = rpcId;
+    }
+    
+
+
+    /**
+     * 
+     * Class that holds all response.
+     * @author not attributable
+     * @version 1.0
+     */
+    public static class RpcCollector {
+        public ArrayList responses = new ArrayList(); 
+        public RpcCollectorKey key;
+        public int options;
+        public int destcnt;
+        public long timeout;
+        
+        public RpcCollector(RpcCollectorKey key, int options, int destcnt, 
long timeout) {
+            this.key = key;
+            this.options = options;
+            this.destcnt = destcnt;
+            this.timeout = timeout;
+        }
+        
+        public void addResponse(Serializable message, Member sender){
+            Response resp = new Response(sender,message);
+            responses.add(resp);
+        }
+        
+        public boolean isComplete() {
+            switch (options) {
+                case ALL_REPLY:
+                    return destcnt == responses.size();
+                case MAJORITY_REPLY:
+                {
+                    float perc = ((float)responses.size()) / ((float)destcnt);
+                    return perc >= 0.50f;
+                }
+                case FIRST_REPLY:
+                    return responses.size()>0;
+                default:
+                    return false;
+            }
+        }
+        
+        public int hashCode() {
+            return key.hashCode();
+        }
+        
+        public boolean equals(Object o) {
+            if ( o instanceof RpcCollector ) {
+                RpcCollector r = (RpcCollector)o;
+                return r.key.equals(this.key);
+            } else return false;
+        }
+        
+        public Response[] getResponses() {
+            return (Response[])responses.toArray(new 
Response[responses.size()]);
+        }
+    }
+    
+    public static class RpcCollectorKey {
+        byte[] id;
+        public RpcCollectorKey(byte[] id) {
+            this.id = id;
+        }
+        
+        public int hashCode() {
+            return id[0]+id[1]+id[2]+id[3];
+        }
+
+        public boolean equals(Object o) {
+            if ( o instanceof RpcCollectorKey ) {
+                RpcCollectorKey r = (RpcCollectorKey)o;
+                return Arrays.equals(id,r.id);
+            } else return false;
+        }
+        
+    }
+
+}
\ No newline at end of file

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java?rev=394147&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
 Fri Apr 14 10:24:08 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.group;
+
+import java.io.ObjectInput;
+import java.io.Serializable;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectOutput;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class RpcMessage implements Externalizable {
+
+    protected Serializable message;
+    protected byte[] uuid;
+    protected byte[] rpcId;
+    protected boolean reply = false;
+
+    public RpcMessage() {
+        //for serialization
+    }
+
+    public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
+        this.rpcId = rpcId;
+        this.uuid = uuid;
+        this.message = message;
+    }
+
+    public void readExternal(ObjectInput in) throws 
IOException,ClassNotFoundException {
+        reply = in.readBoolean();
+        int length = in.readInt();
+        uuid = new byte[length];
+        in.read(uuid, 0, length);
+        length = in.readInt();
+        rpcId = new byte[length];
+        in.read(rpcId, 0, length);
+        message = (Serializable)in.readObject();
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeBoolean(reply);
+        out.writeInt(uuid.length);
+        out.write(uuid, 0, uuid.length);
+        out.writeInt(rpcId.length);
+        out.write(rpcId, 0, rpcId.length);
+        out.writeObject(message);
+    }
+
+}

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=394147&r1=394146&r2=394147&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 Fri Apr 14 10:24:08 2006
@@ -37,6 +37,7 @@
 import org.apache.catalina.tribes.membership.MemberImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.catalina.tribes.group.*;
 
 /**
  * <p>Title: </p>

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=394147&r1=394146&r2=394147&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Fri Apr 14 10:24:08 2006
@@ -29,6 +29,7 @@
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.*;
 
 /**
  * A smart implementation of a stateful replicated map. uses primary/secondary 
backup strategy. 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=394147&r1=394146&r2=394147&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
 Fri Apr 14 10:24:08 2006
@@ -33,7 +33,7 @@
 import org.apache.catalina.tribes.transport.AbstractSender;
 import java.net.UnknownHostException;
 import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.tipis.RpcChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
 
 /**
  * <p>Title: </p>

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java?rev=394147&r1=394146&r2=394147&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
 Fri Apr 14 10:24:08 2006
@@ -6,7 +6,7 @@
 import org.apache.catalina.tribes.tipis.RpcCallback;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.tipis.RpcChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.tipis.Response;
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to