Steve973 commented on code in PR #11906:
URL: https://github.com/apache/camel/pull/11906#discussion_r1383951383


##########
components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java:
##########
@@ -140,30 +149,87 @@ protected void doInit() throws Exception {
         super.doInit();
         DynamicRouterComponent component = getDynamicRouterComponent();
         if (CONTROL_CHANNEL_NAME.equals(configuration.getChannel())) {
-            final DynamicRouterControlChannelProcessor processor = 
controlChannelProcessorFactorySupplier.get()
-                    .getInstance(component);
-            processor.setConfiguration(configuration);
             try {
                 // There can be multiple control actions, but we do not want to
                 // create another consumer on the control channel, so check to
                 // see if the consumer has already been created, and skip the
                 // creation of another consumer if one already exists
                 if (component.getControlChannelProcessor() == null) {
+                    DynamicRouterControlChannelProcessor processor = 
controlChannelProcessorFactorySupplier.get()
+                            .getInstance(component);
+                    processor.setConfiguration(configuration);
                     component.setControlChannelProcessor(processor);
                 }
             } catch (Exception e) {
                 throw new IllegalStateException("Could not create Dynamic 
Router endpoint", e);
             }
         } else {
-            final DynamicRouterProcessor processor = 
processorFactorySupplier.get()
-                    .getInstance("dynamicRouterProcessor-" + 
configuration.getChannel(), getCamelContext(),
-                            configuration.getRecipientMode(), 
configuration.isWarnDroppedMessage(),
-                            filterProcessorFactorySupplier);
-            ServiceHelper.startService(processor);
+            CamelContext camelContext = getCamelContext();
+            String routeId = configuration.getRouteId();
+            long timeout = configuration.getTimeout();
+            ErrorHandler errorHandler = new NoErrorHandler(null);
+            if (producerCache == null) {
+                producerCache = new DefaultProducerCache(this, camelContext, 
1000);
+            }
+            ExecutorService aggregateExecutorService = 
camelContext.getExecutorServiceManager()
+                    .newScheduledThreadPool(this, 
"DynamicRouter-AggregateTask", 0);
+            if (timeout > 0) {
+                // use a cached thread pool so we each on-the-fly task has a 
dedicated thread to process completions as they come in
+                aggregateExecutorService = 
camelContext.getExecutorServiceManager()
+                        .newScheduledThreadPool(this, 
"DynamicRouter-AggregateTask", 0);
+            }
+            AggregationStrategy aggregationStrategy = 
determineAggregationStrategy(camelContext);
+            DynamicRouterMulticastProcessor processor = 
processorFactorySupplier.get()
+                    .getInstance("DynamicRouterMulticastProcessor-" + 
configuration.getChannel(), camelContext, null,
+                            configuration.getRecipientMode(),
+                            configuration.isWarnDroppedMessage(), 
filterProcessorFactorySupplier, producerCache,
+                            aggregationStrategy, 
configuration.isParallelProcessing(),
+                            determineExecutorService(camelContext), 
configuration.isShutdownExecutorService(),
+                            configuration.isStreaming(), 
configuration.isStopOnException(), timeout,
+                            determineOnPrepare(camelContext), 
configuration.isShareUnitOfWork(),
+                            configuration.isParallelAggregate());
+            processor.setErrorHandler(errorHandler);
+            processor.setAggregateExecutorService(aggregateExecutorService);
+            
processor.setIgnoreInvalidEndpoints(configuration.isIgnoreInvalidEndpoints());
+            processor.setId(getId());
+            processor.setRouteId(routeId);
+            ServiceHelper.startService(aggregationStrategy, producerCache, 
processor);
             component.addRoutingProcessor(configuration.getChannel(), 
processor);
         }
     }
 
+    protected ExecutorService determineExecutorService(CamelContext 
camelContext) {
+        ExecutorService executorService = null;
+        if (ObjectHelper.isNotEmpty(configuration.getExecutorService())) {
+            executorService = camelContext.getExecutorServiceManager()
+                    .newThreadPool(this, "@RecipientList", 
configuration.getExecutorService());
+        }
+        if (configuration.isParallelProcessing() && 
configuration.getExecutorService() == null) {
+            // we are running in parallel, so we need a thread pool
+            executorService = camelContext.getExecutorServiceManager()
+                    .newDefaultThreadPool(this, "@RecipientList");
+        }
+        return executorService;
+    }
+
+    protected AggregationStrategy determineAggregationStrategy(CamelContext 
camelContext) {
+        AggregationStrategy aggregationStrategy = new 
UseLatestAggregationStrategy();
+        if (ObjectHelper.isNotEmpty(configuration.getAggregationStrategy())) {
+            aggregationStrategy = 
CamelContextHelper.mandatoryLookup(camelContext,
+                    configuration.getAggregationStrategy(), 
AggregationStrategy.class);
+        }
+        return aggregationStrategy;

Review Comment:
   Thanks.  I will make the change as requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to