Author: azeez Date: Tue Dec 21 10:01:54 2010 New Revision: 1051441 URL: http://svn.apache.org/viewvc?rev=1051441&view=rev Log: Code improvements based on FindBugs feedback
Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java Removed: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/management/commands/ShutdownMemberCommand.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/state/DefaultStateManager.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaMembershipService.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java Tue Dec 21 10:01:54 2010 @@ -37,6 +37,8 @@ import java.util.Random; */ public class ClusteringUtils { + private static final Random RANDOM = new Random(); + /** * Load a ServiceGroup having name <code>serviceGroupName</code> * @@ -55,14 +57,23 @@ public class ClusteringUtils { String axis2Repo = System.getProperty(Constants.AXIS2_REPO); if (isURL(axis2Repo)) { DataHandler dh = new DataHandler(new URL(axis2Repo + "services/" + serviceGroupName)); - String tempDir = + String tempDirName = tempDirectory + File.separator + - (System.currentTimeMillis() + new Random().nextDouble()); - new File(tempDir).mkdirs(); - serviceArchive = new File(tempDir + File.separator + serviceGroupName); - FileOutputStream out = new FileOutputStream(serviceArchive); - dh.writeTo(out); - out.close(); + (System.currentTimeMillis() + RANDOM.nextDouble()); + if(!new File(tempDirName).mkdirs()) { + throw new Exception("Could not create temp dir " + tempDirName); + } + serviceArchive = new File(tempDirName + File.separator + serviceGroupName); + FileOutputStream out = null; + try { + out = new FileOutputStream(serviceArchive); + dh.writeTo(out); + out.close(); + } finally { + if (out != null) { + out.close(); + } + } } else { serviceArchive = new File(axis2Repo + File.separator + "services" + File.separator + serviceGroupName); Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Tue Dec 21 10:01:54 2010 @@ -30,8 +30,9 @@ import java.util.Arrays; */ public class MemberJoinedCommand extends ControlCommand { + private static final long serialVersionUID = -6596472883950279349L; private Member member; - private MembershipManager membershipManager; + private transient MembershipManager membershipManager; public void setMembershipManager(MembershipManager membershipManager) { this.membershipManager = membershipManager; Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Tue Dec 21 10:01:54 2010 @@ -34,9 +34,10 @@ import java.util.Arrays; public class MemberListCommand extends ControlCommand { private static final Log log = LogFactory.getLog(MemberListCommand.class); + private static final long serialVersionUID = 5687720124889269491L; private Member[] members; - private MembershipManager membershipManager; + private transient MembershipManager membershipManager; public void setMembershipManager(MembershipManager membershipManager) { this.membershipManager = membershipManager; Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/management/commands/ShutdownMemberCommand.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/management/commands/ShutdownMemberCommand.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/management/commands/ShutdownMemberCommand.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/management/commands/ShutdownMemberCommand.java Tue Dec 21 10:01:54 2010 @@ -20,7 +20,7 @@ import org.apache.axis2.clustering.manag import org.apache.axis2.context.ConfigurationContext; /** - * + * This command is sent when a node in the cluster needs to be shutdown */ public class ShutdownMemberCommand extends GroupManagementCommand { public void execute(ConfigurationContext configContext) throws ClusteringFault{ Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/state/DefaultStateManager.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/state/DefaultStateManager.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/state/DefaultStateManager.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/state/DefaultStateManager.java Tue Dec 21 10:01:54 2010 @@ -35,10 +35,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * This class is the defaut StateManager of the Apache Tribes based clustering implementation + */ public class DefaultStateManager implements StateManager { - private ConfigurationContext configContext; - private final Map<String, Parameter> parameters = new HashMap<String, Parameter>(); private ChannelSender sender; @@ -91,7 +92,7 @@ public class DefaultStateManager impleme } public void setConfigurationContext(ConfigurationContext configurationContext) { - this.configContext = configurationContext; + // Nothing to do here } public void setReplicationExcludePatterns(String contextType, List patterns) { Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Tue Dec 21 10:01:54 2010 @@ -33,7 +33,7 @@ import java.util.Map; /** * Message intereceptor for handling at-most-once message processing semantics */ -public class AtMostOnceInterceptor extends ChannelInterceptorBase { +public final class AtMostOnceInterceptor extends ChannelInterceptorBase { private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class); private static final Map<MessageId, Long> receivedMessages = Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java?rev=1051441&view=auto ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java (added) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java Tue Dec 21 10:01:54 2010 @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.clustering.tribes; + +import org.apache.axis2.clustering.ClusteringConstants; +import org.apache.axis2.clustering.ClusteringFault; +import org.apache.axis2.clustering.management.DefaultNodeManager; +import org.apache.axis2.clustering.management.GroupManagementCommand; +import org.apache.axis2.clustering.management.NodeManagementCommand; +import org.apache.axis2.clustering.state.DefaultStateManager; +import org.apache.axis2.clustering.state.StateClusteringCommand; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.AxisModule; +import org.apache.axis2.description.AxisServiceGroup; +import org.apache.axis2.engine.AxisConfiguration; +import org.apache.catalina.tribes.ByteMessage; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.RemoteProcessException; +import org.apache.catalina.tribes.group.RpcMessage; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * This is the Tribes channel listener which is used for listening on the channels, receiving + * messages & accepting messages. + */ +public class Axis2ChannelListener implements ChannelListener { + private static final Log log = LogFactory.getLog(Axis2ChannelListener.class); + + private DefaultStateManager stateManager; + private DefaultNodeManager nodeManager; + + private ConfigurationContext configurationContext; + + public Axis2ChannelListener(ConfigurationContext configurationContext, + DefaultNodeManager nodeManager, + DefaultStateManager stateManager) { + this.nodeManager = nodeManager; + this.stateManager = stateManager; + this.configurationContext = configurationContext; + } + + public void setStateManager(DefaultStateManager stateManager) { + this.stateManager = stateManager; + } + + public void setNodeManager(DefaultNodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + public void setConfigurationContext(ConfigurationContext configurationContext) { + this.configurationContext = configurationContext; + } + + /** + * Invoked by the channel to determine if the listener will process this message or not. + * @param msg Serializable + * @param sender Member + * @return boolean + */ + public boolean accept(Serializable msg, Member sender) { + return !(msg instanceof RpcMessage); // RpcMessages will not be handled by this listener + } + + /** + * Receive a message from the channel + * @param msg Serializable + * @param sender - the source of the message + */ + public void messageReceived(Serializable msg, Member sender) { + try { + AxisConfiguration configuration = configurationContext.getAxisConfiguration(); + List<ClassLoader> classLoaders = new ArrayList<ClassLoader>(); + classLoaders.add(configuration.getSystemClassLoader()); + classLoaders.add(getClass().getClassLoader()); + for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) { + AxisServiceGroup group = (AxisServiceGroup) iter.next(); + classLoaders.add(group.getServiceGroupClassLoader()); + } + for(Object obj: configuration.getModules().values()){ + AxisModule module = (AxisModule) obj; + classLoaders.add(module.getModuleClassLoader()); + } + byte[] message = ((ByteMessage) msg).getMessage(); + msg = XByteBuffer.deserialize(message, + 0, + message.length, + classLoaders.toArray(new ClassLoader[classLoaders.size()])); + } catch (Exception e) { + String errMsg = "Cannot deserialize received message"; + log.error(errMsg, e); + throw new RemoteProcessException(errMsg, e); + } + + // If the system has not still been intialized, reject all incoming messages, except the + // GetStateResponseCommand message + if (configurationContext. + getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) { + log.warn("Received message " + msg + + " before cluster initialization has been completed from " + + TribesUtil.getName(sender)); + return; + } + if (log.isDebugEnabled()) { + log.debug("Received message " + msg + " from " + TribesUtil.getName(sender)); + } + + try { + processMessage(msg); + } catch (Exception e) { + String errMsg = "Cannot process received message"; + log.error(errMsg, e); + throw new RemoteProcessException(errMsg, e); + } + } + + private void processMessage(Serializable msg) throws ClusteringFault { + if (msg instanceof StateClusteringCommand && stateManager != null) { + StateClusteringCommand ctxCmd = (StateClusteringCommand) msg; + ctxCmd.execute(configurationContext); + } else if (msg instanceof NodeManagementCommand && nodeManager != null) { + ((NodeManagementCommand) msg).execute(configurationContext); + } else if (msg instanceof GroupManagementCommand){ + ((GroupManagementCommand) msg).execute(configurationContext); + } + } +} Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Tue Dec 21 10:01:54 2010 @@ -61,6 +61,7 @@ import org.apache.commons.logging.LogFac import javax.xml.namespace.QName; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -84,7 +85,7 @@ public class TribesClusteringAgent imple private ManagedChannel channel; private RpcChannel rpcInitChannel; private ConfigurationContext configurationContext; - private ChannelListener channelListener; + private Axis2ChannelListener axis2ChannelListener; private ChannelSender channelSender; private MembershipManager primaryMembershipManager; private RpcInitializationRequestHandler rpcInitRequestHandler; @@ -152,9 +153,9 @@ public class TribesClusteringAgent imple channel = new GroupChannel(); channelSender = new ChannelSender(channel, primaryMembershipManager, synchronizeAllMembers()); - channelListener = - new ChannelListener(configurationContext, configurationManager, contextManager); - channel.addChannelListener(channelListener); + axis2ChannelListener = + new Axis2ChannelListener(configurationContext, configurationManager, contextManager); + channel.addChannelListener(axis2ChannelListener); byte[] domain = getClusterDomain(); log.info("Cluster domain: " + new String(domain)); @@ -206,7 +207,7 @@ public class TribesClusteringAgent imple // If context replication is enabled, get the latest state from a neighbour if (contextManager != null) { contextManager.setSender(channelSender); - channelListener.setStateManager(contextManager); + axis2ChannelListener.setStateManager(contextManager); initializeSystem(new GetStateCommand()); ClusteringContextListener contextListener = new ClusteringContextListener(channelSender); configurationContext.addContextListener(contextListener); @@ -565,7 +566,15 @@ public class TribesClusteringAgent imple "or boolean parameter"); } - } catch (Exception e) { + } catch (InvocationTargetException e) { + handleException("Error invoking setter method named : " + mName + + "() that takes a single String, int, long, float, double " + + "or boolean parameter", e); + } catch (NoSuchMethodException e) { + handleException("Error invoking setter method named : " + mName + + "() that takes a single String, int, long, float, double " + + "or boolean parameter", e); + } catch (IllegalAccessException e) { handleException("Error invoking setter method named : " + mName + "() that takes a single String, int, long, float, double " + "or boolean parameter", e); @@ -701,7 +710,7 @@ public class TribesClusteringAgent imple if (channel != null) { try { channel.removeChannelListener(rpcInitChannel); - channel.removeChannelListener(channelListener); + channel.removeChannelListener(axis2ChannelListener); channel.stop(Channel.DEFAULT); } catch (ChannelException e) { @@ -720,8 +729,8 @@ public class TribesClusteringAgent imple if (rpcInitRequestHandler != null) { rpcInitRequestHandler.setConfigurationContext(configurationContext); } - if (channelListener != null) { - channelListener.setConfigurationContext(configurationContext); + if (axis2ChannelListener != null) { + axis2ChannelListener.setConfigurationContext(configurationContext); } if (configurationManager != null) { configurationManager.setConfigurationContext(configurationContext); Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaMembershipService.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaMembershipService.java?rev=1051441&r1=1051440&r2=1051441&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaMembershipService.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaMembershipService.java Tue Dec 21 10:01:54 2010 @@ -38,10 +38,16 @@ public class WkaMembershipService implem */ protected Properties properties = new Properties(); + /** + * This payload contains some membership information, such as some member specific properties + * e.g. HTTP/S ports + */ protected byte[] payload; + /** + * The domain name of this cluster + */ protected byte[] domain; - private MembershipListener membershipListener; public WkaMembershipService(MembershipManager membershipManager) { this.membershipManager = membershipManager; @@ -129,11 +135,11 @@ public class WkaMembershipService implem } public void setMembershipListener(MembershipListener membershipListener) { - this.membershipListener = membershipListener; + // Nothing to do } public void removeMembershipListener() { - this.membershipListener = null; + // Nothing to do } public void setPayload(byte[] payload) {