Author: remm
Date: Thu Nov  8 10:43:00 2018
New Revision: 1846119

URL: http://svn.apache.org/viewvc?rev=1846119&view=rev
Log:
Refactor various operations performed in tribes using a scheduled executor.
When tribes is not running standalone, it will use the executor from the 
Catalina Service.
If running independently (like in the testsuite), the Channel will provide the 
executor.

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
    tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java
    tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties
    
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Thu Nov  8 
10:43:00 2018
@@ -18,6 +18,7 @@ package org.apache.catalina.tribes;
 
 import java.io.Serializable;
 import java.util.StringJoiner;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -374,6 +375,18 @@ public interface Channel {
     public void setName(String name);
 
     /**
+     * Return executor that can be used for utility tasks.
+     * @return the executor
+     */
+    public ScheduledExecutorService getUtilityExecutor();
+
+    /**
+     * Set the executor that can be used for utility tasks.
+     * @param utilityExecutor the executor
+     */
+    public void setUtilityExecutor(ScheduledExecutorService utilityExecutor);
+
+    /**
      * Translates the name of an option to its integer value.  Valid option 
names are "asynchronous" (alias "async"),
      * "byte_message" (alias "byte"), "multicast", "secure", 
"synchronized_ack" (alias "sync"), "udp", "use_ack"
      * @param opt The name of the option

Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/GroupChannel.java Thu 
Nov  8 10:43:00 2018
@@ -22,6 +22,10 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -72,6 +76,7 @@ public class GroupChannel extends Channe
      * If set to true, the channel will start a local thread for the heart 
beat.
      */
     protected boolean heartbeat = true;
+
     /**
      * If <code>heartbeat == true</code> then how often do we want this
      * heartbeat to run. default is one minute
@@ -79,9 +84,9 @@ public class GroupChannel extends Channe
     protected long heartbeatSleeptime = 5*1000;//every 5 seconds
 
     /**
-     * Internal heartbeat thread
+     * Internal heartbeat future
      */
-    protected HeartbeatThread hbthread = null;
+    protected ScheduledFuture<?> heartbeatFuture = null;
 
     /**
      * The  <code>ChannelCoordinator</code> coordinates the bottom layer 
components:<br>
@@ -134,6 +139,11 @@ public class GroupChannel extends Channe
     private boolean jmxEnabled = true;
 
     /**
+     * Executor service.
+     */
+    protected ScheduledExecutorService utilityExecutor = null;
+
+    /**
      * the ObjectName of this channel.
      */
     private ObjectName oname = null;
@@ -446,6 +456,8 @@ public class GroupChannel extends Channe
 
     }
 
+    protected boolean ownExecutor = false;
+
     /**
      * Starts the channel.
      * @param svc int - what service to start
@@ -459,10 +471,15 @@ public class GroupChannel extends Channe
         // register jmx
         JmxRegistry jmxRegistry = JmxRegistry.getRegistry(this);
         if (jmxRegistry != null) this.oname = 
jmxRegistry.registerJmx(",component=Channel", this);
+        if (utilityExecutor == null) {
+            log.warn(sm.getString("groupChannel.warn.noUtilityExecutor"));
+            utilityExecutor = new ScheduledThreadPoolExecutor(1);
+            ownExecutor = true;
+        }
         super.start(svc);
-        if ( hbthread == null && heartbeat ) {
-            hbthread = new HeartbeatThread(this,heartbeatSleeptime);
-            hbthread.start();
+        if (heartbeatFuture == null && heartbeat) {
+            heartbeatFuture = utilityExecutor.scheduleWithFixedDelay
+                    (new HeartbeatRunnable(), heartbeatSleeptime, 
heartbeatSleeptime, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -474,11 +491,16 @@ public class GroupChannel extends Channe
      */
     @Override
     public synchronized void stop(int svc) throws ChannelException {
-        if (hbthread != null) {
-            hbthread.stopHeartbeat();
-            hbthread = null;
+        if (heartbeatFuture != null) {
+            heartbeatFuture.cancel(true);
+            heartbeatFuture = null;
         }
         super.stop(svc);
+        if (ownExecutor) {
+            utilityExecutor.shutdown();
+            utilityExecutor = null;
+            ownExecutor = false;
+        }
         if (oname != null) {
             JmxRegistry.getRegistry(this).unregisterJmx(oname);
             oname = null;
@@ -494,6 +516,16 @@ public class GroupChannel extends Channe
         else return coordinator;
     }
 
+    @Override
+    public ScheduledExecutorService getUtilityExecutor() {
+        return utilityExecutor;
+    }
+
+    @Override
+    public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
+        this.utilityExecutor = utilityExecutor;
+    }
+
     /**
      * Returns the channel receiver component
      * @return ChannelReceiver
@@ -764,56 +796,20 @@ public class GroupChannel extends Channe
     }
 
     /**
-     *
-     * <p>Title: Internal heartbeat thread</p>
+     * <p>Title: Internal heartbeat runnable</p>
      *
      * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a 
thread of this class
      * is created</p>
-     *
-     * @version 1.0
      */
-    public static class HeartbeatThread extends Thread {
-        private static final Log log = 
LogFactory.getLog(HeartbeatThread.class);
-        protected static int counter = 1;
-        protected static synchronized int inc() {
-            return counter++;
-        }
-
-        protected volatile boolean doRun = true;
-        protected final GroupChannel channel;
-        protected final long sleepTime;
-        public HeartbeatThread(GroupChannel channel, long sleepTime) {
-            super();
-            this.setPriority(MIN_PRIORITY);
-            String channelName = "";
-            if (channel.getName() != null) channelName = "[" + 
channel.getName() + "]";
-            setName("GroupChannel-Heartbeat" + channelName + "-" +inc());
-            setDaemon(true);
-            this.channel = channel;
-            this.sleepTime = sleepTime;
-        }
-        public void stopHeartbeat() {
-            doRun = false;
-            interrupt();
-        }
-
+    public class HeartbeatRunnable implements Runnable {
         @Override
         public void run() {
-            while (doRun) {
-                try {
-                    Thread.sleep(sleepTime);
-                    channel.heartbeat();
-                } catch ( InterruptedException x ) {
-                    // Ignore. Probably triggered by a call to stopHeartbeat().
-                    // In the highly unlikely event it was a different trigger,
-                    // simply ignore it and continue.
-                } catch ( Exception x ) {
-                    
log.error(sm.getString("groupChannel.unable.sendHeartbeat"),x);
-                }//catch
-            }//while
-        }//run
-    }//HeartbeatThread
-
-
+            try {
+                heartbeat();
+            } catch (Exception x) {
+                log.error(sm.getString("groupChannel.unable.sendHeartbeat"), 
x);
+            }
+        }
+    }
 
 }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties 
[UTF-8] (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/LocalStrings.properties 
[UTF-8] Thu Nov  8 10:43:00 2018
@@ -23,4 +23,5 @@ groupChannel.sendFail.noRpcChannelReply=
 groupChannel.optionFlag.conflict=Interceptor option flag conflict: [{0}]
 groupChannel.listener.alreadyExist=Listener already exists:[{0}][{1}]
 groupChannel.unable.sendHeartbeat=Unable to send heartbeat through Tribes 
interceptor stack. Will try to sleep again.
+groupChannel.warn.noUtilityExecutor=No utility executor was set, creating one
 rpcChannel.replyFailed=Unable to send back reply in RpcChannel.

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
 Thu Nov  8 10:43:00 2018
@@ -18,6 +18,7 @@ package org.apache.catalina.tribes.group
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
@@ -156,8 +157,9 @@ public class StaticMembershipInterceptor
         if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) 
super.start(Channel.SND_RX_SEQ);
         if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) 
super.start(Channel.SND_TX_SEQ);
         final ChannelInterceptorBase base = this;
+        ScheduledExecutorService executor = getChannel().getUtilityExecutor();
         for (final Member member : members) {
-            Thread t = new Thread() {
+            Runnable r = new Runnable() {
                 @Override
                 public void run() {
                     base.memberAdded(member);
@@ -166,7 +168,7 @@ public class StaticMembershipInterceptor
                     }
                 }
             };
-            t.start();
+            executor.execute(r);
         }
         super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ));
 

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java
 Thu Nov  8 10:43:00 2018
@@ -18,14 +18,12 @@
 package org.apache.catalina.tribes.membership;
 
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
 import org.apache.catalina.tribes.MembershipProvider;
 import org.apache.catalina.tribes.MembershipService;
-import org.apache.catalina.tribes.util.ExecutorFactory;
 
 public abstract class MembershipProviderBase implements MembershipProvider {
 
@@ -33,7 +31,7 @@ public abstract class MembershipProvider
     protected MembershipListener membershipListener;
     protected MembershipService service;
     // The event notification executor
-    protected final ExecutorService executor = 
ExecutorFactory.newThreadPool(0, 10, 10, TimeUnit.SECONDS);
+    protected ScheduledExecutorService executor;
 
     @Override
     public void init(Properties properties) throws Exception {
@@ -65,5 +63,6 @@ public abstract class MembershipProvider
     @Override
     public void setMembershipService(MembershipService service) {
         this.service = service;
+        executor = service.getChannel().getUtilityExecutor();
     }
 }
\ No newline at end of file

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1846119&r1=1846118&r2=1846119&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu Nov  8 10:43:00 2018
@@ -77,6 +77,16 @@
       </update>
     </changelog>
   </subsection>
+  <subsection name="Tribes">
+    <changelog>
+      <update>
+        Refactor various operations performed in tribes using a scheduled
+        executor. When tribes is not running standalone, it will use the
+        executor from the Catalina Service. If running independently, the
+        Channel will provide the executor. (remm)
+      </update>
+    </changelog>
+  </subsection>
 </section>
 <section name="Tomcat 9.0.13 (markt)" rtext="2018-11-07">
   <subsection name="Catalina">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to