Author: raulk
Date: Mon Feb  4 01:44:24 2013
New Revision: 1442004

URL: http://svn.apache.org/viewvc?rev=1442004&view=rev
Log:
CAMEL-5974 Allow specifying the default type of TaskExecutor used by the DMLC 
(camel-jms)

Added:
    
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
      - copied unchanged from r1442002, 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
    
camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
      - copied unchanged from r1442002, 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
Modified:
    camel/branches/camel-2.10.x/components/camel-jms/   (props changed)
    
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
    
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Propchange: camel/branches/camel-2.10.x/components/camel-jms/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Feb  4 01:44:24 2013
@@ -0,0 +1,2 @@
+/camel/trunk:1441998,1442002
+/camel/trunk/components/camel-jms:1359013,1359197,1359226,1359265,1359341,1360031,1360525-1360527,1360581,1360583,1360719,1361116-1361117,1361126,1361477,1361919,1361973,1365013,1365061,1365616,1365917,1365939,1366272,1366321,1369457,1369490,1369671,1370254,1370783,1371570,1371967,1372244-1372245,1372259,1372329,1372354,1372485,1372833,1372845,1372863,1373374,1373402,1373442,1373453,1373691-1373692,1373722,1373751,1373763,1373785,1374110,1374133,1374290,1374332,1374388,1374530,1374539,1374582-1374584,1374639,1374709,1374926,1374949,1375019,1375027,1375439,1375458,1375460,1375900,1375904,1375936,1376088,1376110,1376140,1376375,1376377,1376870,1377243,1377256,1377272,1377395,1377400,1377402,1377407,1377413,1377608,1378476,1378731,1378825,1378827,1379320,1379324,1379354,1379709,1380157,1380232,1381089,1381137,1381182,1381196,1381552,1381898,1382039,1382416,1382423,1382433,1383220,1383824,1383826,1383858,1384122,1384226,1384316,1384369,1384447,1384609,1384799,1384815,1386495,138
 
6607,1386671,1387019-1387020,1387142,1387545,1387808,1388345,1388750,1388762,1389251,1389326,1389813,1389941,1390820,1390946,1391008,1391021,1391044,1391339,1391346,1391373,1391378,1391439,1391466,1391954,1391958,1392188,1392869,1393291,1393294,1393962,1394388,1394408,1394479,1394577,1394932,1394940,1394943,1394991,1395046,1395085,1395226,1395580,1395626,1395645,1395883,1395891,1398047,1398694,1398756,1398760,1398789,1398797,1398810,1399134,1399165,1399317,1399333,1399519,1399664,1399808,1400074,1400097,1400403,1400729,1400734,1400831,1400863,1400876,1401151,1401159-1401166,1401209,1401264,1401578,1401585,1402383,1402485,1402738,1403186,1403202,1403725,1403863,1403971,1403988,1403990,1404015,1404020,1404031,1404056,1404172,1404233,1404291,1404303,1404474,1404490-1404491,1404496,1404508-1404510,1404519,1404532,1404921,1404934,1405313,1405662,1405678,1405727,1405932,1406066,1406948-1406952,1407104,1407395,1407487-1407488,1407736,1407744,1407748,1407750,1407752,1407764,1407768,
 
1407776,1407818,1407826,1407970,1407975,1407978,1407985,1407987,1407992,1407995,1408185-1408186,1408188,1408193,1408262,1408269,1408297,1408306,1408365,1408373,1408660-1408661,1408667,1408670,1408704,1409155,1409215,1409246,1409398,1409424,1409651,1409678,1409689,1409734,1409787,1410217,1410276,1410336,1410346,1410709,1410906,1410952,1410959,1411129,1411217,1411414,1411617,1411664,1412012,1412100,1412106,1412144,1412434,1412473,1412492,1412495,1412499,1412524,1412672,1412774,1412907,1413312,1413317,1413979,1414022,1414564,1414571,1414740,1414836,1414857,1415129,1415534,1415564,1415838,1415884,1415888,1415979,1416006,1416246,1417318-1417330,1418239,1418595,1418616,1418708,1419305,1419415,1419629,1419655,1419729,1419757,1420038,1420051,1420112,1420523-1420524,1420534,1420610,1421306,1421712,1421723,1421764,1421864,1422241,1422568,1422792,1422807,1422957,1422963,1423299,1423301,1423304,1423797,1423808,1423929,1424386,1424398,1424437,1424915,1425102,1425106,1425262,1425295,14254
 
45,1425653,1425724,1426385,1426459,1426748,1427806,1427815,1427825,1427853,1427989,1428463,1429218,1429271,1429280,1429282,1429477,1429527,1429529,1429707,1430724,1430731,1431152,1431273,1431280,1431291,1431297,1431336,1431626,1431642,1431652,1431705,1431781,1431784,1431813,1431819-1431820,1431891,1431902,1431933,1432413,1432435,1432629,1432632,1433518-1433519,1433855,1433891,1435219,1435497,1435580,1435700,1435812,1435861,1436244,1436308,1436384,1436389,1436420,1436438,1436441,1436487,1436505,1436603,1436619,1436759,1436904,1436944,1436980,1437139,1437158,1437160,1437208,1437283,1438017,1438063,1438363,1438380,1438499-1438500,1438662,1438880,1439433,1439451,1439468,1439473,1439639,1439771,1440280,1440328,1440360,1441387,1441392,1441508,1441518,1441876,1441879,1442002

Modified: 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442004&r1=1442003&r2=1442004&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
 Mon Feb  4 01:44:24 2013
@@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent.
 import org.springframework.core.task.SimpleAsyncTaskExecutor;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 /**
  * The default {@link DefaultMessageListenerContainer container} which listen 
for messages
@@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo
 
     /**
      * Create a default TaskExecutor. Called if no explicit TaskExecutor has 
been specified.
-     * <p>The default implementation builds a {@link 
org.springframework.core.task.SimpleAsyncTaskExecutor}
-     * with the specified bean name and using Camel's {@link 
org.apache.camel.spi.ExecutorServiceManager}
+     * <p />
+     * The type of {@link TaskExecutor} will depend on the value of
+     * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more 
details, refer to the Javadoc of
+     * {@link DefaultTaskExecutorType}.
+     * <p />
+     * In all cases, it uses the specified bean name and Camel's {@link 
org.apache.camel.spi.ExecutorServiceManager}
      * to resolve the thread name.
-     * @see 
org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
+     * @see 
JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType)
+     * @see ThreadPoolTaskExecutor#setBeanName(String)
      */
     @Override
     protected TaskExecutor createDefaultTaskExecutor() {
         String pattern = 
endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
-        String beanName = getBeanName();
+        String beanName = getBeanName() == null ? endpoint.getThreadName() : 
getBeanName();
 
-        SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
-        answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, 
true));
-        return answer;
+        if (endpoint.getDefaultTaskExecutorType() == 
DefaultTaskExecutorType.ThreadPool) {
+            ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
+            answer.setBeanName(beanName);
+            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, 
true));
+            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
+            // Direct hand-off mode. Do not queue up tasks: assign it to a 
thread immediately.
+            // We set no upper-bound on the thread pool (no maxPoolSize) as 
it's already implicitly constrained by
+            // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only 
grow up to a level of concurrency as
+            // defined by maxConcurrentConsumers).
+            answer.setQueueCapacity(0);
+            answer.initialize();
+            return answer;
+        } else {
+            SimpleAsyncTaskExecutor answer = new 
SimpleAsyncTaskExecutor(beanName);
+            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, 
true));
+            return answer;
+        }
     }
-
+    
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442004&r1=1442003&r2=1442004&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 Mon Feb  4 01:44:24 2013
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.Session;
@@ -184,7 +185,7 @@ public class JmsComponent extends Defaul
     public void setCacheLevelName(String cacheName) {
         getConfiguration().setCacheLevelName(cacheName);
     }
-    
+
     public void setReplyToCacheLevelName(String cacheName) {
         getConfiguration().setReplyToCacheLevelName(cacheName);
     }
@@ -236,7 +237,7 @@ public class JmsComponent extends Defaul
     public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
         getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
     }
-    
+
     public void setIdleConsumerLimit(int idleConsumerLimit) {
         getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
     }

Modified: 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442004&r1=1442003&r2=1442004&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 Mon Feb  4 01:44:24 2013
@@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan
 import static 
org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
 
 /**
- * @version 
+ * @version
  */
 public class JmsConfiguration implements Cloneable {
 
@@ -137,6 +137,7 @@ public class JmsConfiguration implements
     private boolean allowNullBody = true;
     private MessageListenerContainerFactory messageListenerContainerFactory;
     private boolean includeSentJMSMessageID;
+    private DefaultTaskExecutorType defaultTaskExecutorType;
 
     public JmsConfiguration() {
     }
@@ -386,7 +387,7 @@ public class JmsConfiguration implements
         case Default:
             return new DefaultJmsMessageListenerContainer(endpoint);
         case Custom:
-            return getCustomMessageListenerContainer(endpoint);            
+            return getCustomMessageListenerContainer(endpoint);
         default:
             throw new IllegalArgumentException("Unknown consumer type: " + 
consumerType);
         }
@@ -1313,4 +1314,16 @@ public class JmsConfiguration implements
     public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) {
         this.includeSentJMSMessageID = includeSentJMSMessageID;
     }
+
+    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
+        return defaultTaskExecutorType;
+    }
+
+    /**
+     * Indicates what type of {@link TaskExecutor} to use by default for JMS 
consumers.
+     * Refer to the documentation of {@link DefaultTaskExecutorType} for 
available options.
+     */
+    public void setDefaultTaskExecutorType(DefaultTaskExecutorType 
defaultTaskExecutorType) {
+        this.defaultTaskExecutorType = defaultTaskExecutorType;
+    }
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442004&r1=1442003&r2=1442004&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 Mon Feb  4 01:44:24 2013
@@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan
 /**
  * A <a href="http://activemq.apache.org/jms.html";>JMS Endpoint</a>
  *
- * @version 
+ * @version
  */
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
@@ -176,24 +176,32 @@ public class JmsEndpoint extends Default
         listenerContainer.setPubSubDomain(pubSubDomain);
 
         // include destination name as part of thread and transaction name
-        String consumerName = "JmsConsumer[" + 
getEndpointConfiguredDestinationName() + "]";
+        String consumerName = getThreadName();
 
         if (configuration.getTaskExecutor() != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Using custom TaskExecutor: {} on listener 
container: {}", configuration.getTaskExecutor(), listenerContainer);
             }
             setContainerTaskExecutor(listenerContainer, 
configuration.getTaskExecutor());
-        } else {
+        } else if ((listenerContainer instanceof 
DefaultJmsMessageListenerContainer && 
configuration.getDefaultTaskExecutorType() == null)
+                || !(listenerContainer instanceof 
DefaultJmsMessageListenerContainer)) {
+            // preserve backwards compatibility if an explicit Default 
TaskExecutor Type was not set;
+            // otherwise, defer the creation of the TaskExecutor
             // use a cached pool as DefaultMessageListenerContainer will 
throttle pool sizing
             ExecutorService executor = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, 
consumerName);
             setContainerTaskExecutor(listenerContainer, executor);
+        } else {
+            // do nothing, as we're working with a 
DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType,
+            // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor 
will handle the creation
+            log.debug("Deferring creation of TaskExecutor for listener 
container: {} as per policy: {}", 
+                    listenerContainer, 
configuration.getDefaultTaskExecutorType());
         }
-        
+
         // set a default transaction name if none provided
         if (configuration.getTransactionName() == null) {
             if (listenerContainer instanceof DefaultMessageListenerContainer) {
                 ((DefaultMessageListenerContainer) 
listenerContainer).setTransactionName(consumerName);
-            }            
+            }
         }
     }
 
@@ -271,6 +279,10 @@ public class JmsEndpoint extends Default
         return true;
     }
 
+    public String getThreadName() {
+        return "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
+    }
+    
     // Properties
     // 
-------------------------------------------------------------------------
 
@@ -448,7 +460,7 @@ public class JmsEndpoint extends Default
     public String getCacheLevelName() {
         return getConfiguration().getCacheLevelName();
     }
-    
+
     @ManagedAttribute
     public String getReplyToCacheLevelName() {
         return getConfiguration().getReplyToCacheLevelName();
@@ -489,7 +501,7 @@ public class JmsEndpoint extends Default
     public LoggingLevel getErrorHandlerLoggingLevel() {
         return getConfiguration().getErrorHandlerLoggingLevel();
     }
-    
+
     @ManagedAttribute
     public boolean isErrorHandlerLogStackTrace() {
         return getConfiguration().isErrorHandlerLogStackTrace();
@@ -509,7 +521,7 @@ public class JmsEndpoint extends Default
     public int getIdleConsumerLimit() {
         return getConfiguration().getIdleConsumerLimit();
     }
-    
+
     public JmsOperations getJmsOperations() {
         return getConfiguration().getJmsOperations();
     }
@@ -724,7 +736,7 @@ public class JmsEndpoint extends Default
     public void setCacheLevelName(String cacheName) {
         getConfiguration().setCacheLevelName(cacheName);
     }
-    
+
     @ManagedAttribute
     public void setReplyToCacheLevelName(String cacheName) {
         getConfiguration().setReplyToCacheLevelName(cacheName);
@@ -795,7 +807,7 @@ public class JmsEndpoint extends Default
     public void setIdleConsumerLimit(int idleConsumerLimit) {
         getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
     }
-    
+
     public void setJmsOperations(JmsOperations jmsOperations) {
         getConfiguration().setJmsOperations(jmsOperations);
     }
@@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default
     public void setAllowNullBody(boolean allowNullBody) {
         configuration.setAllowNullBody(allowNullBody);
     }
-    
+
     @ManagedAttribute
     public boolean isIncludeSentJMSMessageID() {
         return configuration.isIncludeSentJMSMessageID();
@@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default
         configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID);
     }
 
+    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
+        return configuration.getDefaultTaskExecutorType();
+    }
+
+    public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) {
+        configuration.setDefaultTaskExecutorType(type);
+    }
+
     public MessageListenerContainerFactory 
getMessageListenerContainerFactory() {
         return configuration.getMessageListenerContainerFactory();
     }


Reply via email to