Author: fhanik
Date: Fri Apr 14 11:38:48 2006
New Revision: 394168
URL: http://svn.apache.org/viewcvs?rev=394168&view=rev
Log:
Work in progress for no-timeout membership config
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=394168&r1=394167&r2=394168&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Fri Apr 14 11:38:48 2006
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.io.ObjectOutput;
+import org.apache.catalina.tribes.Channel;
/**
* The GroupChannel manages the replication channel. It coordinates
@@ -144,18 +145,38 @@
fwd =
XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
}
//get the actual member with the correct alive time
- Member source = msg.getAddress();
+ System.out.println("Received msg:"+fwd.getClass().getName());
+ Member source = msg.getAddress();
+ boolean rx = false;
for ( int i=0; i<channelListeners.size(); i++ ) {
ChannelListener channelListener =
(ChannelListener)channelListeners.get(i);
- if (channelListener != null && channelListener.accept(fwd,
source))
+ System.out.println("Listener:"+channelListener);
+ if (channelListener != null && channelListener.accept(fwd,
source)) {
+ System.out.println("Setting rx=true");
channelListener.messageReceived(fwd, source);
+ rx = true;
+ }
}//for
+ System.out.println("RX="+rx);
+ if ((!rx) && (msg instanceof RpcMessage)) {
+ System.out.println("Sending RPC NO REPLY");
+ sendNoRpcChannelReply((RpcMessage)fwd,source);
+ }
} catch ( Exception x ) {
log.error("Unable to deserialize channel message.",x);
}
}
+ protected void sendNoRpcChannelReply(RpcMessage msg, Member source) {
+ try {
+ RpcMessage.NoRpcChannelReply reply = new
RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
+ send(new Member[]{source},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ } catch ( Exception x ) {
+ log.error("Unable to find rpc channel, failed to send
NoRpcChannelReply.",x);
+ }
+ }
+
public void memberAdded(Member member) {
//notify upwards
for (int i=0; i<membershipListeners.size(); i++ ) {
@@ -293,12 +314,6 @@
}
- public static class NoChannelReply extends RpcMessage {
- public void readExternal(ObjectInput in) throws
IOException,ClassNotFoundException {
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- }
- }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=394168&r1=394167&r2=394168&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
Fri Apr 14 11:38:48 2006
@@ -142,6 +142,7 @@
public boolean accept(Serializable msg, Member sender) {
if ( msg instanceof RpcMessage ) {
RpcMessage rmsg = (RpcMessage)msg;
+ System.out.println("Accept:"+bToS(this.rpcId)+"
other:"+bToS(rmsg.rpcId)+" value:"+Arrays.equals(rmsg.rpcId,rpcId));
return Arrays.equals(rmsg.rpcId,rpcId);
}else return false;
}
@@ -194,10 +195,17 @@
public void addResponse(Serializable message, Member sender){
Response resp = new Response(sender,message);
- responses.add(resp);
+ synchronized (responses) {
+ if ( (message instanceof RpcMessage.NoRpcChannelReply) )
+ destcnt--;
+ else
+ responses.add(resp);
+ }
+
}
public boolean isComplete() {
+ if ( destcnt <= 0 ) return true;
switch (options) {
case ALL_REPLY:
return destcnt == responses.size();
@@ -247,5 +255,14 @@
}
}
+
+ protected static String bToS(byte[] data) {
+ StringBuffer buf = new StringBuffer(4*16);
+ buf.append("{");
+ for (int i=0; data!=null && i<data.length; i++ )
buf.append(String.valueOf(data[i])).append(" ");
+ buf.append("}");
+ return buf.toString();
+ }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java?rev=394168&r1=394167&r2=394168&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
Fri Apr 14 11:38:48 2006
@@ -70,5 +70,30 @@
out.write(rpcId, 0, rpcId.length);
out.writeObject(message);
}
+
+ public static class NoRpcChannelReply extends RpcMessage {
+ public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
+ super(rpcid,uuid,null);
+ reply = true;
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ reply = true;
+ int length = in.readInt();
+ uuid = new byte[length];
+ in.read(uuid, 0, length);
+ length = in.readInt();
+ rpcId = new byte[length];
+ in.read(rpcId, 0, length);
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(uuid.length);
+ out.write(uuid, 0, uuid.length);
+ out.writeInt(rpcId.length);
+ out.write(rpcId, 0, rpcId.length);
+ }
+ }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=394168&r1=394167&r2=394168&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
Fri Apr 14 11:38:48 2006
@@ -56,11 +56,12 @@
}
public boolean accept(Serializable msg, Member source) {
- return true;
+ table.dataModel.getValueAt(-1,-1);
+ return false;
}
public void messageReceived(Serializable msg, Member source) {
- table.dataModel.getValueAt(-1,-1);
+
}
public void memberAdded(Member member) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]