Author: fhanik
Date: Wed Feb 22 13:16:25 2006
New Revision: 379904
URL: http://svn.apache.org/viewcvs?rev=379904&view=rev
Log:
Completed a first version of the independent GroupChannel, still need to remove
all the JMX stuff from the core components, JMX should be monitoring using
outside beans, not be baked into the code
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
Wed Feb 22 13:16:25 2006
@@ -59,5 +59,8 @@
* @return The port
*/
public int getPort();
+
+ public void setMessageListener(MessageListener listener);
+ public MessageListener getMessageListener();
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
Wed Feb 22 13:16:25 2006
@@ -22,6 +22,7 @@
import org.apache.catalina.cluster.ClusterSender;
import org.apache.catalina.cluster.ClusterReceiver;
import org.apache.catalina.cluster.ClusterChannel;
+import java.io.IOException;
/**
@@ -36,8 +37,19 @@
private ClusterSender clusterSender;
private MembershipService membershipService;
+ public ChannelCoordinator() {
+
+ }
+
+ public ChannelCoordinator(ClusterReceiver receiver,
+ ClusterSender sender,
+ MembershipService service) {
+ this();
+ this.setClusterReceiver(receiver);
+ this.setClusterSender(sender);
+ this.setMembershipService(service);
+ }
-
/**
* Send a message to one or more members in the cluster
* @param destination Member[] - the destinations, null or zero length
means all
@@ -45,9 +57,12 @@
* @param options int - sender options, see class documentation
* @return ClusterMessage[] - the replies from the members, if any.
*/
- public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage
msg, int options) {
- throw new UnsupportedOperationException();
- //implement sending and receiving logic.
+ public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage
msg, int options) throws IOException {
+ if ( destination == null ) destination =
membershipService.getMembers();
+ for ( int i=0; i<destination.length; i++ ) {
+ clusterSender.sendMessage(msg,destination[i]);
+ }
+ return null;
}
@@ -97,6 +112,17 @@
}
}
+
+ public void memberAdded(Member member){
+ if ( clusterSender!=null ) clusterSender.add(member);
+ super.memberAdded(member);
+ }
+
+ public void memberDisappeared(Member member){
+ if ( clusterSender!=null ) clusterSender.remove(member);
+ super.memberDisappeared(member);
+ }
+
public ClusterReceiver getClusterReceiver() {
return clusterReceiver;
@@ -111,7 +137,13 @@
}
public void setClusterReceiver(ClusterReceiver clusterReceiver) {
- this.clusterReceiver = clusterReceiver;
+ if ( clusterReceiver != null ) {
+ this.clusterReceiver = clusterReceiver;
+ this.clusterReceiver.setMessageListener(this);
+ } else {
+ if (this.clusterReceiver!=null )
this.clusterReceiver.setMessageListener(null);
+ this.clusterReceiver = null;
+ }
}
public void setClusterSender(ClusterSender clusterSender) {
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
Wed Feb 22 13:16:25 2006
@@ -19,6 +19,7 @@
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.MembershipListener;
import org.apache.catalina.cluster.MessageListener;
+import java.io.IOException;
/**
* Abstract class for the interceptor base class.
@@ -51,7 +52,7 @@
return previous;
}
- public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage
msg, int options) {
+ public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage
msg, int options) throws IOException {
return getNext().sendMessage(destination, msg,options);
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
Wed Feb 22 13:16:25 2006
@@ -71,7 +71,17 @@
* @return ClusterMessage[] - the replies from the members, if any.
*/
public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int
options) throws ChannelException {
- throw new UnsupportedOperationException("Method send not yet
implemented.");
+ if ( msg == null ) return null;
+ msg.setAddress(getMembershipService().getLocalMember());
+ msg.setCompress(msg.FLAG_ALLOWED);
+ msg.setTimestamp(System.currentTimeMillis());
+ msg.setResend(msg.FLAG_FORBIDDEN);
+ try {
+ if (interceptors != null)return
interceptors.sendMessage(destination, msg, options);
+ else return this.coordinator.sendMessage(destination, msg,
options);
+ }catch ( Exception x ) {
+ throw new ChannelException(x);
+ }
}
/**
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Wed Feb 22 13:16:25 2006
@@ -16,6 +16,7 @@
package org.apache.catalina.cluster.tcp;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectableChannel;
@@ -25,13 +26,14 @@
import java.nio.channels.SocketChannel;
import java.util.Iterator;
-import org.apache.catalina.cluster.io.ObjectReader;
-import org.apache.catalina.cluster.io.ListenCallback;
-import org.apache.catalina.cluster.ClusterReceiver;
-import org.apache.catalina.util.StringManager;
-import java.io.IOException;
import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.group.ChannelInterceptorBase;
+import org.apache.catalina.cluster.io.ListenCallback;
+import org.apache.catalina.cluster.io.ObjectReader;
import org.apache.catalina.cluster.io.XByteBuffer;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.cluster.MessageListener;
/**
* @author Filip Hanik
@@ -70,7 +72,7 @@
private Object interestOpsMutex = new Object();
-
+ private MessageListener listener = null;
public ReplicationListener() {
}
@@ -309,7 +311,20 @@
}
public void messageDataReceived(ClusterData data) {
- //nothing to do yet
+ if ( this.listener != null ) {
+ try {
+ ClusterMessage msg = deserialize(data);
+ listener.messageReceived(msg);
+ }catch ( java.io.IOException x ) {
+ if ( log.isErrorEnabled() ) {
+ log.error("Unable to receive and deserialize cluster data.
IOException.",x);
+ }
+ }catch ( java.lang.ClassNotFoundException cx ) {
+ if ( log.isErrorEnabled() ) {
+ log.error("Unable to receive and deserialize cluster data.
ClassNotFoundException.",cx);
+ }
+ }
+ }
}
/**
@@ -385,8 +400,16 @@
return tcpListenPort;
}
+ public MessageListener getMessageListener() {
+ return listener;
+ }
+
public void setTcpListenPort(int tcpListenPort) {
this.tcpListenPort = tcpListenPort;
+ }
+
+ public void setMessageListener(MessageListener listener) {
+ this.listener = listener;
}
public String getHost() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]