This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push:
new 041ac35e83 Code clean-up - no functional change
041ac35e83 is described below
commit 041ac35e83915383bb285e08a275eb95954c5a63
Author: Mark Thomas <[email protected]>
AuthorDate: Fri May 10 13:45:31 2024 +0100
Code clean-up - no functional change
---
.../catalina/tribes/group/AbsoluteOrder.java | 62 ++---
.../catalina/tribes/group/ChannelCoordinator.java | 147 ++++++------
.../tribes/group/ChannelInterceptorBase.java | 30 +--
.../catalina/tribes/group/ExtendedRpcCallback.java | 18 +-
.../apache/catalina/tribes/group/GroupChannel.java | 257 ++++++++++-----------
.../catalina/tribes/group/GroupChannelMBean.java | 10 +-
.../catalina/tribes/group/InterceptorPayload.java | 2 +-
.../org/apache/catalina/tribes/group/Response.java | 1 +
.../apache/catalina/tribes/group/RpcCallback.java | 15 +-
.../apache/catalina/tribes/group/RpcChannel.java | 111 ++++-----
.../apache/catalina/tribes/group/RpcMessage.java | 8 +-
11 files changed, 328 insertions(+), 333 deletions(-)
diff --git a/java/org/apache/catalina/tribes/group/AbsoluteOrder.java
b/java/org/apache/catalina/tribes/group/AbsoluteOrder.java
index 1e06dc1f09..df889928e7 100644
--- a/java/org/apache/catalina/tribes/group/AbsoluteOrder.java
+++ b/java/org/apache/catalina/tribes/group/AbsoluteOrder.java
@@ -24,19 +24,22 @@ import java.util.List;
import org.apache.catalina.tribes.Member;
/**
- * <p>Title: Membership - Absolute Order</p>
- *
- * <p>Description: A simple, yet agreeable and efficient way of ordering
members</p>
* <p>
- * Ordering members can serve as a basis for electing a leader or
coordinating efforts.<br>
- * This is stinky simple, it works on the basis of the <code>Member</code>
interface
- * and orders members in the following format:
+ * Title: Membership - Absolute Order
+ * </p>
+ * <p>
+ * Description: A simple, yet agreeable and efficient way of ordering members
+ * </p>
+ * <p>
+ * Ordering members can serve as a basis for electing a leader or coordinating
efforts.<br>
+ * This is stinky simple, it works on the basis of the <code>Member</code>
interface and orders members in the following
+ * format:
* </p>
* <ol>
- * <li>IP comparison - byte by byte, lower byte higher rank</li>
- * <li>IPv4 addresses rank higher than IPv6, ie the lesser number of
bytes, the higher rank</li>
- * <li>Port comparison - lower port, higher rank</li>
- * <li>UniqueId comparison- byte by byte, lower byte higher rank</li>
+ * <li>IP comparison - byte by byte, lower byte higher rank</li>
+ * <li>IPv4 addresses rank higher than IPv6, ie the lesser number of bytes,
the higher rank</li>
+ * <li>Port comparison - lower port, higher rank</li>
+ * <li>UniqueId comparison- byte by byte, lower byte higher rank</li>
* </ol>
*
* @see org.apache.catalina.tribes.Member
@@ -50,55 +53,54 @@ public class AbsoluteOrder {
public static void absoluteOrder(Member[] members) {
- if ( members == null || members.length <= 1 ) {
+ if (members == null || members.length <= 1) {
return;
}
- Arrays.sort(members,comp);
+ Arrays.sort(members, comp);
}
public static void absoluteOrder(List<Member> members) {
- if ( members == null || members.size() <= 1 ) {
+ if (members == null || members.size() <= 1) {
return;
}
members.sort(comp);
}
- public static class AbsoluteComparator implements Comparator<Member>,
- Serializable {
+ public static class AbsoluteComparator implements Comparator<Member>,
Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Member m1, Member m2) {
- int result = compareIps(m1,m2);
- if ( result == 0 ) {
- result = comparePorts(m1,m2);
+ int result = compareIps(m1, m2);
+ if (result == 0) {
+ result = comparePorts(m1, m2);
}
- if ( result == 0 ) {
- result = compareIds(m1,m2);
+ if (result == 0) {
+ result = compareIds(m1, m2);
}
return result;
}
public int compareIps(Member m1, Member m2) {
- return compareBytes(m1.getHost(),m2.getHost());
+ return compareBytes(m1.getHost(), m2.getHost());
}
public int comparePorts(Member m1, Member m2) {
- return compareInts(m1.getPort(),m2.getPort());
+ return compareInts(m1.getPort(), m2.getPort());
}
public int compareIds(Member m1, Member m2) {
- return compareBytes(m1.getUniqueId(),m2.getUniqueId());
+ return compareBytes(m1.getUniqueId(), m2.getUniqueId());
}
protected int compareBytes(byte[] d1, byte[] d2) {
int result = 0;
- if ( d1.length == d2.length ) {
- for (int i=0; (result==0) && (i<d1.length); i++) {
- result = compareBytes(d1[i],d2[i]);
+ if (d1.length == d2.length) {
+ for (int i = 0; (result == 0) && (i < d1.length); i++) {
+ result = compareBytes(d1[i], d2[i]);
}
- } else if ( d1.length < d2.length) {
+ } else if (d1.length < d2.length) {
result = -1;
} else {
result = 1;
@@ -107,14 +109,14 @@ public class AbsoluteOrder {
}
protected int compareBytes(byte b1, byte b2) {
- return compareInts(b1,b2);
+ return compareInts(b1, b2);
}
protected int compareInts(int b1, int b2) {
int result = 0;
- if ( b1 == b2 ) {
+ if (b1 == b2) {
- } else if ( b1 < b2) {
+ } else if (b1 < b2) {
result = -1;
} else {
result = 1;
diff --git a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java
b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java
index 3f93572a77..bd744c1a27 100644
--- a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java
+++ b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java
@@ -35,9 +35,8 @@ import org.apache.catalina.tribes.util.Logs;
import org.apache.catalina.tribes.util.StringManager;
/**
- * The channel coordinator object coordinates the membership service,
- * the sender and the receiver.
- * This is the last interceptor in the chain.
+ * The channel coordinator object coordinates the membership service, the
sender and the receiver. This is the last
+ * interceptor in the chain.
*/
public class ChannelCoordinator extends ChannelInterceptorBase implements
MessageListener {
protected static final StringManager sm =
StringManager.getManager(ChannelCoordinator.class);
@@ -48,16 +47,12 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
private int startLevel = 0;
public ChannelCoordinator() {
- this(new NioReceiver(), new ReplicationTransmitter(),
- new McastService());
+ this(new NioReceiver(), new ReplicationTransmitter(), new
McastService());
}
- public ChannelCoordinator(ChannelReceiver receiver,
- ChannelSender sender,
- MembershipService service) {
+ public ChannelCoordinator(ChannelReceiver receiver, ChannelSender sender,
MembershipService service) {
- this.optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE |
- Channel.SEND_OPTIONS_USE_ACK |
+ this.optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE |
Channel.SEND_OPTIONS_USE_ACK |
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
this.setClusterReceiver(receiver);
@@ -67,25 +62,25 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
/**
* Send a message to one or more members in the cluster
+ *
* @param destination Member[] - the destinations, null or zero length
means all
- * @param msg ClusterMessage - the message to send
- * @param payload TBA
+ * @param msg ClusterMessage - the message to send
+ * @param payload TBA
*/
@Override
public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload)
throws ChannelException {
- if ( destination == null ) {
+ if (destination == null) {
destination = membershipService.getMembers();
}
- if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) ==
Channel.SEND_OPTIONS_MULTICAST) {
+ if ((msg.getOptions() & Channel.SEND_OPTIONS_MULTICAST) ==
Channel.SEND_OPTIONS_MULTICAST) {
membershipService.broadcast(msg);
} else {
- clusterSender.sendMessage(msg,destination);
+ clusterSender.sendMessage(msg, destination);
}
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new
UniqueId(msg.getUniqueId()) +
- " at " + new
java.sql.Timestamp(System.currentTimeMillis()) + " to " +
- Arrays.toNameString(destination));
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new
UniqueId(msg.getUniqueId()) + " at " +
+ new java.sql.Timestamp(System.currentTimeMillis()) + " to
" + Arrays.toNameString(destination));
}
}
@@ -102,75 +97,73 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
/**
- * Starts up the channel. This can be called multiple times for individual
services to start
- * The svc parameter can be the logical or value of any constants
+ * Starts up the channel. This can be called multiple times for individual
services to start The svc parameter can
+ * be the logical or value of any constants
+ *
* @param svc int value of <BR>
- * DEFAULT - will start all services <BR>
- * MBR_RX_SEQ - starts the membership receiver <BR>
- * MBR_TX_SEQ - starts the membership broadcaster <BR>
- * SND_TX_SEQ - starts the replication transmitter<BR>
- * SND_RX_SEQ - starts the replication receiver<BR>
+ * DEFAULT - will start all services <BR>
+ * MBR_RX_SEQ - starts the membership receiver <BR>
+ * MBR_TX_SEQ - starts the membership broadcaster <BR>
+ * SND_TX_SEQ - starts the replication transmitter<BR>
+ * SND_RX_SEQ - starts the replication receiver<BR>
+ *
* @throws ChannelException if a startup error occurs or the service is
already started.
*/
protected synchronized void internalStart(int svc) throws ChannelException
{
try {
boolean valid = false;
- //make sure we don't pass down any flags that are unrelated to the
bottom layer
+ // make sure we don't pass down any flags that are unrelated to
the bottom layer
svc = svc & Channel.DEFAULT;
- if (startLevel == Channel.DEFAULT)
- {
- return; //we have already started up all components
+ if (startLevel == Channel.DEFAULT) {
+ return; // we have already started up all components
}
- if (svc == 0 )
- {
- return;//nothing to start
+ if (svc == 0) {
+ return;// nothing to start
}
if (svc == (svc & startLevel)) {
- throw new
ChannelException(sm.getString("channelCoordinator.alreadyStarted",
- Integer.toString(svc)));
+ throw new
ChannelException(sm.getString("channelCoordinator.alreadyStarted",
Integer.toString(svc)));
}
- //must start the receiver first so that we can coordinate the port
it
- //listens to with the local membership settings
- if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
+ // must start the receiver first so that we can coordinate the
port it
+ // listens to with the local membership settings
+ if (Channel.SND_RX_SEQ == (svc & Channel.SND_RX_SEQ)) {
clusterReceiver.setMessageListener(this);
clusterReceiver.setChannel(getChannel());
clusterReceiver.start();
- //synchronize, big time FIXME
+ // synchronize, big time FIXME
Member localMember = getChannel().getLocalMember(false);
if (localMember instanceof StaticMember) {
// static member
- StaticMember staticMember = (StaticMember)localMember;
+ StaticMember staticMember = (StaticMember) localMember;
staticMember.setHost(getClusterReceiver().getHost());
staticMember.setPort(getClusterReceiver().getPort());
staticMember.setSecurePort(getClusterReceiver().getSecurePort());
} else {
// multicast member
membershipService.setLocalMemberProperties(getClusterReceiver().getHost(),
- getClusterReceiver().getPort(),
- getClusterReceiver().getSecurePort(),
+ getClusterReceiver().getPort(),
getClusterReceiver().getSecurePort(),
getClusterReceiver().getUdpPort());
}
valid = true;
}
- if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
+ if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) {
clusterSender.setChannel(getChannel());
clusterSender.start();
valid = true;
}
- if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
+ if (Channel.MBR_RX_SEQ == (svc & Channel.MBR_RX_SEQ)) {
membershipService.setMembershipListener(this);
membershipService.setChannel(getChannel());
if (membershipService instanceof McastService) {
- ((McastService)membershipService).setMessageListener(this);
+ ((McastService)
membershipService).setMessageListener(this);
}
membershipService.start(MembershipService.MBR_RX);
valid = true;
}
- if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
+ if (Channel.MBR_TX_SEQ == (svc & Channel.MBR_TX_SEQ)) {
membershipService.setChannel(getChannel());
membershipService.start(MembershipService.MBR_TX);
valid = true;
@@ -180,59 +173,59 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
throw new
IllegalArgumentException(sm.getString("channelCoordinator.invalid.startLevel"));
}
startLevel = (startLevel | svc);
- }catch ( ChannelException cx ) {
+ } catch (ChannelException cx) {
throw cx;
- }catch ( Exception x ) {
+ } catch (Exception x) {
throw new ChannelException(x);
}
}
/**
- * Shuts down the channel. This can be called multiple times for
individual services to shutdown
- * The svc parameter can be the logical or value of any constants
+ * Shuts down the channel. This can be called multiple times for
individual services to shutdown The svc parameter
+ * can be the logical or value of any constants
+ *
* @param svc int value of <BR>
- * DEFAULT - will shutdown all services <BR>
- * MBR_RX_SEQ - starts the membership receiver <BR>
- * MBR_TX_SEQ - starts the membership broadcaster <BR>
- * SND_TX_SEQ - starts the replication transmitter<BR>
- * SND_RX_SEQ - starts the replication receiver<BR>
+ * DEFAULT - will shutdown all services <BR>
+ * MBR_RX_SEQ - starts the membership receiver <BR>
+ * MBR_TX_SEQ - starts the membership broadcaster <BR>
+ * SND_TX_SEQ - starts the replication transmitter<BR>
+ * SND_RX_SEQ - starts the replication receiver<BR>
+ *
* @throws ChannelException if a startup error occurs or the service is
already started.
*/
protected synchronized void internalStop(int svc) throws ChannelException {
try {
- //make sure we don't pass down any flags that are unrelated to the
bottom layer
+ // make sure we don't pass down any flags that are unrelated to
the bottom layer
svc = svc & Channel.DEFAULT;
- if (startLevel == 0)
- {
- return; //we have already stopped up all components
+ if (startLevel == 0) {
+ return; // we have already stopped up all components
}
- if (svc == 0 )
- {
- return;//nothing to stop
+ if (svc == 0) {
+ return;// nothing to stop
}
boolean valid = false;
- if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
+ if (Channel.MBR_TX_SEQ == (svc & Channel.MBR_TX_SEQ)) {
membershipService.stop(MembershipService.MBR_TX);
valid = true;
}
- if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
+ if (Channel.MBR_RX_SEQ == (svc & Channel.MBR_RX_SEQ)) {
membershipService.stop(MembershipService.MBR_RX);
membershipService.setMembershipListener(null);
valid = true;
}
- if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
+ if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) {
clusterSender.stop();
valid = true;
}
- if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
+ if (Channel.SND_RX_SEQ == (svc & Channel.SND_RX_SEQ)) {
clusterReceiver.stop();
clusterReceiver.setMessageListener(null);
valid = true;
}
- if ( !valid) {
+ if (!valid) {
throw new
IllegalArgumentException(sm.getString("channelCoordinator.invalid.startLevel"));
}
@@ -244,24 +237,22 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
}
@Override
- public void memberAdded(Member member){
+ public void memberAdded(Member member) {
SenderState.getSenderState(member);
super.memberAdded(member);
}
@Override
- public void memberDisappeared(Member member){
+ public void memberDisappeared(Member member) {
SenderState.removeSenderState(member);
super.memberDisappeared(member);
}
@Override
public void messageReceived(ChannelMessage msg) {
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("ChannelCoordinator - Received msg:" +
- new UniqueId(msg.getUniqueId()) + " at " +
- new java.sql.Timestamp(System.currentTimeMillis()) + "
from " +
- msg.getAddress().getName());
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("ChannelCoordinator - Received msg:" + new
UniqueId(msg.getUniqueId()) + " at " +
+ new java.sql.Timestamp(System.currentTimeMillis()) + "
from " + msg.getAddress().getName());
}
super.messageReceived(msg);
}
@@ -284,11 +275,11 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
}
public void setClusterReceiver(ChannelReceiver clusterReceiver) {
- if ( clusterReceiver != null ) {
+ if (clusterReceiver != null) {
this.clusterReceiver = clusterReceiver;
this.clusterReceiver.setMessageListener(this);
} else {
- if (this.clusterReceiver!=null ) {
+ if (this.clusterReceiver != null) {
this.clusterReceiver.setMessageListener(null);
}
this.clusterReceiver = null;
@@ -306,7 +297,7 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
@Override
public void heartbeat() {
- if ( clusterSender!=null ) {
+ if (clusterSender != null) {
clusterSender.heartbeat();
}
super.heartbeat();
@@ -323,7 +314,7 @@ public class ChannelCoordinator extends
ChannelInterceptorBase implements Messag
}
@Override
- public Member getMember(Member mbr){
+ public Member getMember(Member mbr) {
return this.getMembershipService().getMember(mbr);
}
diff --git a/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
b/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
index 7bb01d5202..fcb7db7925 100644
--- a/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
+++ b/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
@@ -33,7 +33,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
private ChannelInterceptor next;
private ChannelInterceptor previous;
private Channel channel;
- //default value, always process
+ // default value, always process
protected int optionFlag = 0;
/**
@@ -46,10 +46,10 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
}
public boolean okToProcess(int messageFlags) {
- if (this.optionFlag == 0 ) {
+ if (this.optionFlag == 0) {
return true;
}
- return ((optionFlag&messageFlags) == optionFlag);
+ return ((optionFlag & messageFlags) == optionFlag);
}
@Override
@@ -83,8 +83,8 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
}
@Override
- public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws
- ChannelException {
+ public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload)
+ throws ChannelException {
if (getNext() != null) {
getNext().sendMessage(destination, msg, payload);
}
@@ -99,7 +99,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public void memberAdded(Member member) {
- //notify upwards
+ // notify upwards
if (getPrevious() != null) {
getPrevious().memberAdded(member);
}
@@ -107,7 +107,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public void memberDisappeared(Member member) {
- //notify upwards
+ // notify upwards
if (getPrevious() != null) {
getPrevious().memberDisappeared(member);
}
@@ -122,7 +122,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public boolean hasMembers() {
- if ( getNext()!=null ) {
+ if (getNext() != null) {
return getNext().hasMembers();
} else {
return false;
@@ -131,7 +131,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public Member[] getMembers() {
- if ( getNext()!=null ) {
+ if (getNext() != null) {
return getNext().getMembers();
} else {
return null;
@@ -140,7 +140,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public Member getMember(Member mbr) {
- if ( getNext()!=null) {
+ if (getNext() != null) {
return getNext().getMember(mbr);
} else {
return null;
@@ -149,7 +149,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public Member getLocalMember(boolean incAlive) {
- if ( getNext()!=null ) {
+ if (getNext() != null) {
return getNext().getLocalMember(incAlive);
} else {
return null;
@@ -158,14 +158,14 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public void start(int svc) throws ChannelException {
- if ( getNext()!=null ) {
+ if (getNext() != null) {
getNext().start(svc);
}
// register jmx
JmxRegistry jmxRegistry = JmxRegistry.getRegistry(channel);
if (jmxRegistry != null) {
- this.oname = jmxRegistry.registerJmx(
- ",component=Interceptor,interceptorName=" +
getClass().getSimpleName(), this);
+ this.oname =
jmxRegistry.registerJmx(",component=Interceptor,interceptorName=" +
getClass().getSimpleName(),
+ this);
}
}
@@ -183,7 +183,7 @@ public abstract class ChannelInterceptorBase implements
ChannelInterceptor {
@Override
public void fireInterceptorEvent(InterceptorEvent event) {
- //empty operation
+ // empty operation
}
@Override
diff --git a/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java
b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java
index c5e460be8e..4902071f02 100644
--- a/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java
+++ b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java
@@ -19,27 +19,29 @@ package org.apache.catalina.tribes.group;
import java.io.Serializable;
import org.apache.catalina.tribes.Member;
+
/**
- * Extension to the {@link RpcCallback} interface. Allows an RPC messenger to
get a confirmation if the reply
- * was sent successfully to the original sender.
- *
+ * Extension to the {@link RpcCallback} interface. Allows an RPC messenger to
get a confirmation if the reply was sent
+ * successfully to the original sender.
*/
public interface ExtendedRpcCallback extends RpcCallback {
/**
* The reply failed.
- * @param request - the original message that requested the reply
+ *
+ * @param request - the original message that requested the reply
* @param response - the reply message to the original message
- * @param sender - the sender requested that reply
- * @param reason - the reason the reply failed
+ * @param sender - the sender requested that reply
+ * @param reason - the reason the reply failed
*/
void replyFailed(Serializable request, Serializable response, Member
sender, Exception reason);
/**
* The reply succeeded
- * @param request - the original message that requested the reply
+ *
+ * @param request - the original message that requested the reply
* @param response - the reply message to the original message
- * @param sender - the sender requested that reply
+ * @param sender - the sender requested that reply
*/
void replySucceeded(Serializable request, Serializable response, Member
sender);
}
diff --git a/java/org/apache/catalina/tribes/group/GroupChannel.java
b/java/org/apache/catalina/tribes/group/GroupChannel.java
index 7ad0eb036d..31f1c074c1 100644
--- a/java/org/apache/catalina/tribes/group/GroupChannel.java
+++ b/java/org/apache/catalina/tribes/group/GroupChannel.java
@@ -60,28 +60,26 @@ import org.apache.juli.logging.LogFactory;
/**
* The default implementation of a Channel.<br>
- * The GroupChannel manages the replication channel. It coordinates
- * message being sent and received with membership announcements.
- * The channel has an chain of interceptors that can modify the message or
perform other logic.<br>
+ * The GroupChannel manages the replication channel. It coordinates message
being sent and received with membership
+ * announcements. The channel has an chain of interceptors that can modify the
message or perform other logic.<br>
* It manages a complete group, both membership and replication.
*/
-public class GroupChannel extends ChannelInterceptorBase
- implements ManagedChannel, JmxChannel, GroupChannelMBean {
+public class GroupChannel extends ChannelInterceptorBase implements
ManagedChannel, JmxChannel, GroupChannelMBean {
private static final Log log = LogFactory.getLog(GroupChannel.class);
protected static final StringManager sm =
StringManager.getManager(GroupChannel.class);
/**
- * Flag to determine if the channel manages its own heartbeat
- * If set to true, the channel will start a local thread for the heart
beat.
+ * Flag to determine if the channel manages its own heartbeat If set to
true, the channel will start a local thread
+ * for the heart beat.
*/
protected boolean heartbeat = true;
/**
- * If <code>heartbeat == true</code> then how often do we want this
- * heartbeat to run. The default value is 5000 milliseconds.
+ * If <code>heartbeat == true</code> then how often do we want this
heartbeat to run. The default value is 5000
+ * milliseconds.
*/
- protected long heartbeatSleeptime = 5*1000;
+ protected long heartbeatSleeptime = 5 * 1000;
/**
* Internal heartbeat future
@@ -90,7 +88,7 @@ public class GroupChannel extends ChannelInterceptorBase
protected ScheduledFuture<?> monitorFuture;
/**
- * The <code>ChannelCoordinator</code> coordinates the bottom layer
components:<br>
+ * The <code>ChannelCoordinator</code> coordinates the bottom layer
components:<br>
* - MembershipService<br>
* - ChannelSender <br>
* - ChannelReceiver<br>
@@ -98,9 +96,8 @@ public class GroupChannel extends ChannelInterceptorBase
protected final ChannelCoordinator coordinator = new ChannelCoordinator();
/**
- * The first interceptor in the interceptor stack.
- * The interceptors are chained in a linked list, so we only need a
reference to the
- * first one
+ * The first interceptor in the interceptor stack. The interceptors are
chained in a linked list, so we only need a
+ * reference to the first one
*/
protected ChannelInterceptor interceptors = null;
@@ -150,8 +147,7 @@ public class GroupChannel extends ChannelInterceptorBase
private ObjectName oname = null;
/**
- * Creates a GroupChannel. This constructor will also
- * add the first interceptor in the GroupChannel.<br>
+ * Creates a GroupChannel. This constructor will also add the first
interceptor in the GroupChannel.<br>
* The first interceptor is always the channel itself.
*/
public GroupChannel() {
@@ -161,14 +157,14 @@ public class GroupChannel extends ChannelInterceptorBase
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
- if ( interceptors == null ) {
+ if (interceptors == null) {
interceptors = interceptor;
interceptors.setNext(coordinator);
interceptors.setPrevious(null);
coordinator.setPrevious(interceptors);
} else {
ChannelInterceptor last = interceptors;
- while ( last.getNext() != coordinator ) {
+ while (last.getNext() != coordinator) {
last = last.getNext();
}
last.setNext(interceptor);
@@ -180,37 +176,36 @@ public class GroupChannel extends ChannelInterceptorBase
/**
* Sends a heartbeat through the interceptor stack.<br>
- * Invoke this method from the application on a periodic basis if
- * you have turned off internal heartbeats
<code>channel.setHeartbeat(false)</code>
+ * Invoke this method from the application on a periodic basis if you have
turned off internal heartbeats
+ * <code>channel.setHeartbeat(false)</code>
*/
@Override
public void heartbeat() {
super.heartbeat();
for (MembershipListener listener : membershipListeners) {
- if ( listener instanceof Heartbeat ) {
- ((Heartbeat)listener).heartbeat();
+ if (listener instanceof Heartbeat) {
+ ((Heartbeat) listener).heartbeat();
}
}
for (ChannelListener listener : channelListeners) {
- if ( listener instanceof Heartbeat ) {
- ((Heartbeat)listener).heartbeat();
+ if (listener instanceof Heartbeat) {
+ ((Heartbeat) listener).heartbeat();
}
}
}
@Override
- public UniqueId send(Member[] destination, Serializable msg, int options)
- throws ChannelException {
- return send(destination,msg,options,null);
+ public UniqueId send(Member[] destination, Serializable msg, int options)
throws ChannelException {
+ return send(destination, msg, options, null);
}
@Override
public UniqueId send(Member[] destination, Serializable msg, int options,
ErrorHandler handler)
throws ChannelException {
- if ( msg == null ) {
+ if (msg == null) {
throw new
ChannelException(sm.getString("groupChannel.nullMessage"));
}
XByteBuffer buffer = null;
@@ -218,41 +213,39 @@ public class GroupChannel extends ChannelInterceptorBase
if (destination == null || destination.length == 0) {
throw new
ChannelException(sm.getString("groupChannel.noDestination"));
}
- ChannelData data = new ChannelData(true);//generates a unique Id
+ ChannelData data = new ChannelData(true);// generates a unique Id
data.setAddress(getLocalMember(false));
data.setTimestamp(System.currentTimeMillis());
byte[] b = null;
- if ( msg instanceof ByteMessage ){
- b = ((ByteMessage)msg).getMessage();
+ if (msg instanceof ByteMessage) {
+ b = ((ByteMessage) msg).getMessage();
options = options | SEND_OPTIONS_BYTE_MESSAGE;
} else {
b = XByteBuffer.serialize(msg);
options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
}
data.setOptions(options);
- //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
- buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
- buffer.append(b,0,b.length);
+ // XByteBuffer buffer = new XByteBuffer(b.length+128,false);
+ buffer = BufferPool.getBufferPool().getBuffer(b.length + 128,
false);
+ buffer.append(b, 0, b.length);
data.setMessage(buffer);
InterceptorPayload payload = null;
- if ( handler != null ) {
+ if (handler != null) {
payload = new InterceptorPayload();
payload.setErrorHandler(handler);
}
getFirstInterceptor().sendMessage(destination, data, payload);
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new
UniqueId(data.getUniqueId()) +
- " at " + new
java.sql.Timestamp(System.currentTimeMillis()) + " to " +
- Arrays.toNameString(destination));
- Logs.MESSAGES.trace("GroupChannel - Send Message:" +
- new UniqueId(data.getUniqueId()) + " is " + msg);
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new
UniqueId(data.getUniqueId()) + " at " +
+ new java.sql.Timestamp(System.currentTimeMillis()) + "
to " + Arrays.toNameString(destination));
+ Logs.MESSAGES.trace("GroupChannel - Send Message:" + new
UniqueId(data.getUniqueId()) + " is " + msg);
}
return new UniqueId(data.getUniqueId());
} catch (RuntimeException | IOException e) {
throw new ChannelException(e);
} finally {
- if ( buffer != null ) {
+ if (buffer != null) {
BufferPool.getBufferPool().returnBuffer(buffer);
}
}
@@ -261,44 +254,39 @@ public class GroupChannel extends ChannelInterceptorBase
/**
* Callback from the interceptor stack. <br>
- * When a message is received from a remote node, this method will be
- * invoked by the previous interceptor.<br>
- * This method can also be used to send a message to other components
- * within the same application, but its an extreme case, and you're
probably
- * better off doing that logic between the applications itself.
+ * When a message is received from a remote node, this method will be
invoked by the previous interceptor.<br>
+ * This method can also be used to send a message to other components
within the same application, but its an
+ * extreme case, and you're probably better off doing that logic between
the applications itself.
+ *
* @param msg ChannelMessage
*/
@Override
public void messageReceived(ChannelMessage msg) {
- if ( msg == null ) {
+ if (msg == null) {
return;
}
try {
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel - Received msg:" +
- new UniqueId(msg.getUniqueId()) + " at " +
- new java.sql.Timestamp(System.currentTimeMillis()) + "
from " +
- msg.getAddress().getName());
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("GroupChannel - Received msg:" + new
UniqueId(msg.getUniqueId()) + " at " +
+ new java.sql.Timestamp(System.currentTimeMillis()) + "
from " + msg.getAddress().getName());
}
Serializable fwd = null;
- if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) ==
SEND_OPTIONS_BYTE_MESSAGE ) {
+ if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) ==
SEND_OPTIONS_BYTE_MESSAGE) {
fwd = new ByteMessage(msg.getMessage().getBytes());
} else {
try {
- fwd =
XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
- msg.getMessage().getLength());
- }catch (Exception sx) {
- log.error(sm.getString("groupChannel.unable.deserialize",
msg),sx);
+ fwd =
XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
msg.getMessage().getLength());
+ } catch (Exception sx) {
+ log.error(sm.getString("groupChannel.unable.deserialize",
msg), sx);
return;
}
}
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel - Receive Message:" +
- new UniqueId(msg.getUniqueId()) + " is " + fwd);
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new
UniqueId(msg.getUniqueId()) + " is " + fwd);
}
- //get the actual member with the correct alive time
+ // get the actual member with the correct alive time
Member source = msg.getAddress();
boolean rx = false;
boolean delivered = false;
@@ -306,61 +294,62 @@ public class GroupChannel extends ChannelInterceptorBase
if (channelListener != null && channelListener.accept(fwd,
source)) {
channelListener.messageReceived(fwd, source);
delivered = true;
- //if the message was accepted by an RPC channel, that
channel
- //is responsible for returning the reply, otherwise we
send an absence reply
+ // if the message was accepted by an RPC channel, that
channel
+ // is responsible for returning the reply, otherwise we
send an absence reply
if (channelListener instanceof RpcChannel) {
rx = true;
}
}
- }//for
+ } // for
if ((!rx) && (fwd instanceof RpcMessage)) {
- //if we have a message that requires a response,
- //but none was given, send back an immediate one
- sendNoRpcChannelReply((RpcMessage)fwd,source);
+ // if we have a message that requires a response,
+ // but none was given, send back an immediate one
+ sendNoRpcChannelReply((RpcMessage) fwd, source);
}
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "]
id:" +
- new UniqueId(msg.getUniqueId()));
+ if (Logs.MESSAGES.isTraceEnabled()) {
+ Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "]
id:" + new UniqueId(msg.getUniqueId()));
}
- } catch ( Exception x ) {
- //this could be the channel listener throwing an exception, we
should log it
- //as a warning.
- if ( log.isWarnEnabled() ) {
- log.warn(sm.getString("groupChannel.receiving.error"),x);
+ } catch (Exception x) {
+ // this could be the channel listener throwing an exception, we
should log it
+ // as a warning.
+ if (log.isWarnEnabled()) {
+ log.warn(sm.getString("groupChannel.receiving.error"), x);
}
- throw new
RemoteProcessException(sm.getString("groupChannel.receiving.error"),x);
+ throw new
RemoteProcessException(sm.getString("groupChannel.receiving.error"), x);
}
}
/**
* Sends a <code>NoRpcChannelReply</code> message to a member<br>
- * This method gets invoked by the channel if an RPC message comes in
- * and no channel listener accepts the message. This avoids timeout
- * @param msg RpcMessage
+ * This method gets invoked by the channel if an RPC message comes in and
no channel listener accepts the message.
+ * This avoids timeout
+ *
+ * @param msg RpcMessage
* @param destination Member - the destination for the reply
*/
protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
try {
- //avoid circular loop
- if ( msg instanceof RpcMessage.NoRpcChannelReply) {
+ // avoid circular loop
+ if (msg instanceof RpcMessage.NoRpcChannelReply) {
return;
}
RpcMessage.NoRpcChannelReply reply = new
RpcMessage.NoRpcChannelReply(msg.rpcId, msg.uuid);
- send(new Member[]{destination}, reply, SEND_OPTIONS_ASYNCHRONOUS);
- } catch ( Exception x ) {
-
log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"),x);
+ send(new Member[] { destination }, reply,
SEND_OPTIONS_ASYNCHRONOUS);
+ } catch (Exception x) {
+ log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"),
x);
}
}
/**
- * memberAdded gets invoked by the interceptor below the channel
- * and the channel will broadcast it to the membership listeners
+ * memberAdded gets invoked by the interceptor below the channel and the
channel will broadcast it to the membership
+ * listeners
+ *
* @param member Member - the new member
*/
@Override
public void memberAdded(Member member) {
- //notify upwards
+ // notify upwards
for (MembershipListener membershipListener : membershipListeners) {
if (membershipListener != null) {
membershipListener.memberAdded(member);
@@ -369,13 +358,14 @@ public class GroupChannel extends ChannelInterceptorBase
}
/**
- * memberDisappeared gets invoked by the interceptor below the channel
- * and the channel will broadcast it to the membership listeners
+ * memberDisappeared gets invoked by the interceptor below the channel and
the channel will broadcast it to the
+ * membership listeners
+ *
* @param member Member - the member that left or crashed
*/
@Override
public void memberDisappeared(Member member) {
- //notify upwards
+ // notify upwards
for (MembershipListener membershipListener : membershipListeners) {
if (membershipListener != null) {
membershipListener.memberDisappeared(member);
@@ -384,13 +374,12 @@ public class GroupChannel extends ChannelInterceptorBase
}
/**
- * Sets up the default implementation interceptor stack
- * if no interceptors have been added
+ * Sets up the default implementation interceptor stack if no interceptors
have been added
+ *
* @throws ChannelException Cluster error
*/
protected synchronized void setupDefaultStack() throws ChannelException {
- if (getFirstInterceptor() != null &&
- ((getFirstInterceptor().getNext() instanceof
ChannelCoordinator))) {
+ if (getFirstInterceptor() != null && ((getFirstInterceptor().getNext()
instanceof ChannelCoordinator))) {
addInterceptor(new MessageDispatchInterceptor());
}
Iterator<ChannelInterceptor> interceptors = getInterceptors();
@@ -402,20 +391,21 @@ public class GroupChannel extends ChannelInterceptorBase
}
/**
- * Validates the option flags that each interceptor is using and reports
- * an error if two interceptor share the same flag.
+ * Validates the option flags that each interceptor is using and reports
an error if two interceptor share the same
+ * flag.
+ *
* @throws ChannelException Error with option flag
*/
protected void checkOptionFlags() throws ChannelException {
StringBuilder conflicts = new StringBuilder();
ChannelInterceptor first = interceptors;
- while ( first != null ) {
+ while (first != null) {
int flag = first.getOptionFlag();
- if ( flag != 0 ) {
+ if (flag != 0) {
ChannelInterceptor next = first.getNext();
- while ( next != null ) {
+ while (next != null) {
int nflag = next.getOptionFlag();
- if (nflag!=0 && (((flag & nflag) == flag ) || ((flag &
nflag) == nflag)) ) {
+ if (nflag != 0 && (((flag & nflag) == flag) || ((flag &
nflag) == nflag))) {
conflicts.append('[');
conflicts.append(first.getClass().getName());
conflicts.append(':');
@@ -425,15 +415,14 @@ public class GroupChannel extends ChannelInterceptorBase
conflicts.append(':');
conflicts.append(nflag);
conflicts.append("] ");
- }//end if
+ } // end if
next = next.getNext();
- }//while
- }//end if
+ } // while
+ } // end if
first = first.getNext();
- }//while
- if ( conflicts.length() > 0 ) {
- throw new
ChannelException(sm.getString("groupChannel.optionFlag.conflict",
- conflicts.toString()));
+ } // while
+ if (conflicts.length() > 0) {
+ throw new
ChannelException(sm.getString("groupChannel.optionFlag.conflict",
conflicts.toString()));
}
}
@@ -470,8 +459,8 @@ public class GroupChannel extends ChannelInterceptorBase
log.error(sm.getString("groupChannel.unable.sendHeartbeat"), e);
}
}
- heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new
HeartbeatRunnable(),
- heartbeatSleeptime, heartbeatSleeptime,
TimeUnit.MILLISECONDS);
+ heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new
HeartbeatRunnable(), heartbeatSleeptime,
+ heartbeatSleeptime, TimeUnit.MILLISECONDS);
}
}
@@ -499,6 +488,7 @@ public class GroupChannel extends ChannelInterceptorBase
/**
* Returns the first interceptor of the stack. Useful for traversal.
+ *
* @return ChannelInterceptor
*/
public ChannelInterceptor getFirstInterceptor() {
@@ -551,7 +541,7 @@ public class GroupChannel extends ChannelInterceptorBase
@Override
public void addMembershipListener(MembershipListener membershipListener) {
- if (!this.membershipListeners.contains(membershipListener) ) {
+ if (!this.membershipListeners.contains(membershipListener)) {
this.membershipListeners.add(membershipListener);
}
}
@@ -563,11 +553,11 @@ public class GroupChannel extends ChannelInterceptorBase
@Override
public void addChannelListener(ChannelListener channelListener) {
- if (!this.channelListeners.contains(channelListener) ) {
+ if (!this.channelListeners.contains(channelListener)) {
this.channelListeners.add(channelListener);
} else {
- throw new
IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist",
- channelListener,channelListener.getClass().getName()));
+ throw new
IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist",
channelListener,
+ channelListener.getClass().getName()));
}
}
@@ -578,14 +568,14 @@ public class GroupChannel extends ChannelInterceptorBase
@Override
public Iterator<ChannelInterceptor> getInterceptors() {
- return new InterceptorIterator(this.getNext(),this.coordinator);
+ return new InterceptorIterator(this.getNext(), this.coordinator);
}
/**
* Enables/disables the option check<br>
- * Setting this to true, will make the GroupChannel perform a conflict
check
- * on the interceptors. If two interceptors are using the same option flag
- * and throw an error upon start.
+ * Setting this to true, will make the GroupChannel perform a conflict
check on the interceptors. If two
+ * interceptors are using the same option flag and throw an error upon
start.
+ *
* @param optionCheck boolean
*/
public void setOptionCheck(boolean optionCheck) {
@@ -595,6 +585,7 @@ public class GroupChannel extends ChannelInterceptorBase
/**
* Configure local heartbeat sleep time<br>
* Only used when <code>getHeartbeat()==true</code>
+ *
* @param heartbeatSleeptime long - time in milliseconds to sleep between
heartbeats
*/
public void setHeartbeatSleeptime(long heartbeatSleeptime) {
@@ -602,9 +593,9 @@ public class GroupChannel extends ChannelInterceptorBase
}
/**
- * Enables or disables local heartbeat.
- * if <code>setHeartbeat(true)</code> is invoked then the channel will
start an internal
- * thread to invoke <code>Channel.heartbeat()</code> every
<code>getHeartbeatSleeptime</code> milliseconds
+ * Enables or disables local heartbeat. if <code>setHeartbeat(true)</code>
is invoked then the channel will start an
+ * internal thread to invoke <code>Channel.heartbeat()</code> every
<code>getHeartbeatSleeptime</code> milliseconds
+ *
* @param heartbeat boolean
*/
@Override
@@ -623,8 +614,8 @@ public class GroupChannel extends ChannelInterceptorBase
}
/**
- * @return the sleep time in milliseconds that the internal heartbeat will
- * sleep in between invocations of <code>Channel.heartbeat()</code>
+ * @return the sleep time in milliseconds that the internal heartbeat will
sleep in between invocations of
+ * <code>Channel.heartbeat()</code>
*/
@Override
public long getHeartbeatSleeptime() {
@@ -672,8 +663,7 @@ public class GroupChannel extends ChannelInterceptorBase
}
@Override
- public ObjectName preRegister(MBeanServer server, ObjectName name)
- throws Exception {
+ public ObjectName preRegister(MBeanServer server, ObjectName name) throws
Exception {
// NOOP
return null;
}
@@ -699,6 +689,7 @@ public class GroupChannel extends ChannelInterceptorBase
public static class InterceptorIterator implements
Iterator<ChannelInterceptor> {
private final ChannelInterceptor end;
private ChannelInterceptor start;
+
public InterceptorIterator(ChannelInterceptor start,
ChannelInterceptor end) {
this.end = end;
this.start = start;
@@ -706,13 +697,13 @@ public class GroupChannel extends ChannelInterceptorBase
@Override
public boolean hasNext() {
- return start!=null && start != end;
+ return start != null && start != end;
}
@Override
public ChannelInterceptor next() {
ChannelInterceptor result = null;
- if ( hasNext() ) {
+ if (hasNext()) {
result = start;
start = start.getNext();
}
@@ -721,15 +712,17 @@ public class GroupChannel extends ChannelInterceptorBase
@Override
public void remove() {
- //empty operation
+ // empty operation
}
}
/**
- * <p>Title: Internal heartbeat runnable</p>
- *
- * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a
thread of this class
- * is created</p>
+ * <p>
+ * Title: Internal heartbeat runnable
+ * </p>
+ * <p>
+ * Description: if <code>Channel.getHeartbeat()==true</code> then a thread
of this class is created
+ * </p>
*/
public class HeartbeatRunnable implements Runnable {
@Override
diff --git a/java/org/apache/catalina/tribes/group/GroupChannelMBean.java
b/java/org/apache/catalina/tribes/group/GroupChannelMBean.java
index 83bbaa6487..aecc054132 100644
--- a/java/org/apache/catalina/tribes/group/GroupChannelMBean.java
+++ b/java/org/apache/catalina/tribes/group/GroupChannelMBean.java
@@ -39,11 +39,9 @@ public interface GroupChannelMBean {
void stop(int svc) throws ChannelException;
- UniqueId send(Member[] destination, Serializable msg, int options)
- throws ChannelException;
+ UniqueId send(Member[] destination, Serializable msg, int options) throws
ChannelException;
- UniqueId send(Member[] destination, Serializable msg, int options,
ErrorHandler handler)
- throws ChannelException;
+ UniqueId send(Member[] destination, Serializable msg, int options,
ErrorHandler handler) throws ChannelException;
void addMembershipListener(MembershipListener listener);
@@ -53,9 +51,9 @@ public interface GroupChannelMBean {
void removeChannelListener(ChannelListener listener);
- boolean hasMembers() ;
+ boolean hasMembers();
- Member[] getMembers() ;
+ Member[] getMembers();
Member getLocalMember(boolean incAlive);
diff --git a/java/org/apache/catalina/tribes/group/InterceptorPayload.java
b/java/org/apache/catalina/tribes/group/InterceptorPayload.java
index d0b44c812f..3aceeea3d7 100644
--- a/java/org/apache/catalina/tribes/group/InterceptorPayload.java
+++ b/java/org/apache/catalina/tribes/group/InterceptorPayload.java
@@ -18,7 +18,7 @@ package org.apache.catalina.tribes.group;
import org.apache.catalina.tribes.ErrorHandler;
-public class InterceptorPayload {
+public class InterceptorPayload {
private ErrorHandler errorHandler;
public ErrorHandler getErrorHandler() {
diff --git a/java/org/apache/catalina/tribes/group/Response.java
b/java/org/apache/catalina/tribes/group/Response.java
index 91aa73fba5..6d53e1b687 100644
--- a/java/org/apache/catalina/tribes/group/Response.java
+++ b/java/org/apache/catalina/tribes/group/Response.java
@@ -26,6 +26,7 @@ import org.apache.catalina.tribes.Member;
public class Response {
private Member source;
private Serializable message;
+
public Response() {
}
diff --git a/java/org/apache/catalina/tribes/group/RpcCallback.java
b/java/org/apache/catalina/tribes/group/RpcCallback.java
index 83ee4ec339..e376a7eefe 100644
--- a/java/org/apache/catalina/tribes/group/RpcCallback.java
+++ b/java/org/apache/catalina/tribes/group/RpcCallback.java
@@ -21,23 +21,26 @@ import java.io.Serializable;
import org.apache.catalina.tribes.Member;
/**
- * The RpcCallback interface is an interface for the Tribes channel to request
a
- * response object to a request that came in.
+ * The RpcCallback interface is an interface for the Tribes channel to request
a response object to a request that came
+ * in.
*/
public interface RpcCallback {
/**
* Allows sending a response to a received message.
- * @param msg The message
+ *
+ * @param msg The message
* @param sender Member
+ *
* @return Serializable object, <code>null</code> if no reply should be
sent
*/
Serializable replyRequest(Serializable msg, Member sender);
/**
- * If the reply has already been sent to the requesting thread,
- * the rpc callback can handle any data that comes in after the fact.
- * @param msg The message
+ * If the reply has already been sent to the requesting thread, the rpc
callback can handle any data that comes in
+ * after the fact.
+ *
+ * @param msg The message
* @param sender Member
*/
void leftOver(Serializable msg, Member sender);
diff --git a/java/org/apache/catalina/tribes/group/RpcChannel.java
b/java/org/apache/catalina/tribes/group/RpcChannel.java
index c72963f2e5..6f0b0c087b 100644
--- a/java/org/apache/catalina/tribes/group/RpcChannel.java
+++ b/java/org/apache/catalina/tribes/group/RpcChannel.java
@@ -50,13 +50,13 @@ public class RpcChannel implements ChannelListener {
private byte[] rpcId;
private int replyMessageOptions = 0;
- private final ConcurrentMap<RpcCollectorKey, RpcCollector> responseMap =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<RpcCollectorKey,RpcCollector> responseMap =
new ConcurrentHashMap<>();
/**
- * Create an RPC channel. You can have several RPC channels attached to a
group
- * all separated out by the uniqueness
- * @param rpcId - the unique Id for this RPC group
- * @param channel Channel
+ * Create an RPC channel. You can have several RPC channels attached to a
group all separated out by the uniqueness
+ *
+ * @param rpcId - the unique Id for this RPC group
+ * @param channel Channel
* @param callback RpcCallback
*/
public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
@@ -69,42 +69,41 @@ public class RpcChannel implements ChannelListener {
/**
* Send a message and wait for the response.
- * @param destination Member[] - the destination for the message, and the
members you request a reply from
- * @param message Serializable - the message you are sending out
- * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
+ *
+ * @param destination Member[] - the destination for the message, and
the members you request a reply from
+ * @param message Serializable - the message you are sending out
+ * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
* @param channelOptions channel sender options
- * @param timeout long - timeout in milliseconds, if no reply is received
within this time null is returned
+ * @param timeout long - timeout in milliseconds, if no reply is
received within this time null is returned
+ *
* @return Response[] - an array of response objects.
+ *
* @throws ChannelException Error sending message
*/
- public Response[] send(Member[] destination,
- Serializable message,
- int rpcOptions,
- int channelOptions,
- long timeout) throws ChannelException {
+ public Response[] send(Member[] destination, Serializable message, int
rpcOptions, int channelOptions, long timeout)
+ throws ChannelException {
- if ( destination==null || destination.length == 0 ) {
+ if (destination == null || destination.length == 0) {
return new Response[0];
}
- //avoid dead lock
- int sendOptions =
- channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+ // avoid dead lock
+ int sendOptions = channelOptions &
~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
RpcCollectorKey key = new
RpcCollectorKey(UUIDGenerator.randomUUID(false));
- RpcCollector collector = new
RpcCollector(key,rpcOptions,destination.length);
+ RpcCollector collector = new RpcCollector(key, rpcOptions,
destination.length);
try {
synchronized (collector) {
- if ( rpcOptions != NO_REPLY ) {
+ if (rpcOptions != NO_REPLY) {
responseMap.put(key, collector);
}
RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
channel.send(destination, rmsg, sendOptions);
- if ( rpcOptions != NO_REPLY ) {
+ if (rpcOptions != NO_REPLY) {
collector.wait(timeout);
}
}
- } catch ( InterruptedException ix ) {
+ } catch (InterruptedException ix) {
Thread.currentThread().interrupt();
} finally {
responseMap.remove(key);
@@ -114,9 +113,9 @@ public class RpcChannel implements ChannelListener {
@Override
public void messageReceived(Serializable msg, Member sender) {
- RpcMessage rmsg = (RpcMessage)msg;
+ RpcMessage rmsg = (RpcMessage) msg;
RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
- if ( rmsg.reply ) {
+ if (rmsg.reply) {
RpcCollector collector = responseMap.get(key);
if (collector == null) {
if (!(rmsg instanceof RpcMessage.NoRpcChannelReply)) {
@@ -124,9 +123,9 @@ public class RpcChannel implements ChannelListener {
}
} else {
synchronized (collector) {
- //make sure it hasn't been removed
- if ( responseMap.containsKey(key) ) {
- if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) {
+ // make sure it hasn't been removed
+ if (responseMap.containsKey(key)) {
+ if ((rmsg instanceof RpcMessage.NoRpcChannelReply)) {
collector.destcnt--;
} else {
collector.addResponse(rmsg.message, sender);
@@ -135,27 +134,30 @@ public class RpcChannel implements ChannelListener {
collector.notifyAll();
}
} else {
- if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) )
{
+ if (!(rmsg instanceof RpcMessage.NoRpcChannelReply)) {
callback.leftOver(rmsg.message, sender);
}
}
- }//synchronized
- }//end if
+ } // synchronized
+ } // end if
} else {
boolean finished = false;
- final ExtendedRpcCallback excallback = (callback instanceof
ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null;
- boolean asyncReply = ((replyMessageOptions &
Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS);
- Serializable reply = callback.replyRequest(rmsg.message,sender);
+ final ExtendedRpcCallback excallback =
+ (callback instanceof ExtendedRpcCallback) ?
((ExtendedRpcCallback) callback) : null;
+ boolean asyncReply =
+ ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS)
== Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ Serializable reply = callback.replyRequest(rmsg.message, sender);
ErrorHandler handler = null;
final Serializable request = msg;
final Serializable response = reply;
final Member fsender = sender;
- if (excallback!=null && asyncReply) {
+ if (excallback != null && asyncReply) {
handler = new ErrorHandler() {
@Override
public void handleError(ChannelException x, UniqueId id) {
excallback.replyFailed(request, response, fsender, x);
}
+
@Override
public void handleCompletion(UniqueId id) {
excallback.replySucceeded(request, response, fsender);
@@ -165,23 +167,25 @@ public class RpcChannel implements ChannelListener {
rmsg.reply = true;
rmsg.message = reply;
try {
- if (handler!=null) {
- channel.send(new Member[] {sender},
rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler);
+ if (handler != null) {
+ channel.send(new Member[] { sender }, rmsg,
+ replyMessageOptions &
~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler);
} else {
- channel.send(new Member[] {sender},
rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ channel.send(new Member[] { sender }, rmsg,
+ replyMessageOptions &
~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
}
finished = true;
- } catch ( Exception x ) {
+ } catch (Exception x) {
if (excallback != null && !asyncReply) {
excallback.replyFailed(rmsg.message, reply, sender, x);
} else {
- log.error(sm.getString("rpcChannel.replyFailed"),x);
+ log.error(sm.getString("rpcChannel.replyFailed"), x);
}
}
if (finished && excallback != null && !asyncReply) {
excallback.replySucceeded(rmsg.message, reply, sender);
}
- }//end if
+ } // end if
}
public void breakdown() {
@@ -190,9 +194,9 @@ public class RpcChannel implements ChannelListener {
@Override
public boolean accept(Serializable msg, Member sender) {
- if ( msg instanceof RpcMessage ) {
- RpcMessage rmsg = (RpcMessage)msg;
- return Arrays.equals(rmsg.rpcId,rpcId);
+ if (msg instanceof RpcMessage) {
+ RpcMessage rmsg = (RpcMessage) msg;
+ return Arrays.equals(rmsg.rpcId, rpcId);
} else {
return false;
}
@@ -246,22 +250,22 @@ public class RpcChannel implements ChannelListener {
}
public void addResponse(Serializable message, Member sender) {
- Response resp = new Response(sender,message);
+ Response resp = new Response(sender, message);
responses.add(resp);
}
public boolean isComplete() {
- if ( destcnt <= 0 ) {
+ if (destcnt <= 0) {
return true;
}
switch (options) {
case ALL_REPLY:
return destcnt == responses.size();
case MAJORITY_REPLY:
- float perc = ((float)responses.size()) / ((float)destcnt);
+ float perc = ((float) responses.size()) / ((float)
destcnt);
return perc >= 0.50f;
case FIRST_REPLY:
- return responses.size()>0;
+ return responses.size() > 0;
default:
return false;
}
@@ -274,8 +278,8 @@ public class RpcChannel implements ChannelListener {
@Override
public boolean equals(Object o) {
- if ( o instanceof RpcCollector ) {
- RpcCollector r = (RpcCollector)o;
+ if (o instanceof RpcCollector) {
+ RpcCollector r = (RpcCollector) o;
return r.key.equals(this.key);
} else {
return false;
@@ -289,20 +293,21 @@ public class RpcChannel implements ChannelListener {
public static class RpcCollectorKey {
final byte[] id;
+
public RpcCollectorKey(byte[] id) {
this.id = id;
}
@Override
public int hashCode() {
- return id[0]+id[1]+id[2]+id[3];
+ return id[0] + id[1] + id[2] + id[3];
}
@Override
public boolean equals(Object o) {
- if ( o instanceof RpcCollectorKey ) {
- RpcCollectorKey r = (RpcCollectorKey)o;
- return Arrays.equals(id,r.id);
+ if (o instanceof RpcCollectorKey) {
+ RpcCollectorKey r = (RpcCollectorKey) o;
+ return Arrays.equals(id, r.id);
} else {
return false;
}
diff --git a/java/org/apache/catalina/tribes/group/RpcMessage.java
b/java/org/apache/catalina/tribes/group/RpcMessage.java
index 0fc804d219..052befc9e5 100644
--- a/java/org/apache/catalina/tribes/group/RpcMessage.java
+++ b/java/org/apache/catalina/tribes/group/RpcMessage.java
@@ -32,7 +32,7 @@ public class RpcMessage implements Externalizable {
protected boolean reply = false;
public RpcMessage() {
- //for serialization
+ // for serialization
}
public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
@@ -42,7 +42,7 @@ public class RpcMessage implements Externalizable {
}
@Override
- public void readExternal(ObjectInput in) throws
IOException,ClassNotFoundException {
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
reply = in.readBoolean();
int length = in.readInt();
uuid = new byte[length];
@@ -50,7 +50,7 @@ public class RpcMessage implements Externalizable {
length = in.readInt();
rpcId = new byte[length];
in.readFully(rpcId);
- message = (Serializable)in.readObject();
+ message = (Serializable) in.readObject();
}
@Override
@@ -82,7 +82,7 @@ public class RpcMessage implements Externalizable {
}
public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
- super(rpcid,uuid,null);
+ super(rpcid, uuid, null);
reply = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]