Steve973 commented on code in PR #11906: URL: https://github.com/apache/camel/pull/11906#discussion_r1383951383
########## components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java: ########## @@ -140,30 +149,87 @@ protected void doInit() throws Exception { super.doInit(); DynamicRouterComponent component = getDynamicRouterComponent(); if (CONTROL_CHANNEL_NAME.equals(configuration.getChannel())) { - final DynamicRouterControlChannelProcessor processor = controlChannelProcessorFactorySupplier.get() - .getInstance(component); - processor.setConfiguration(configuration); try { // There can be multiple control actions, but we do not want to // create another consumer on the control channel, so check to // see if the consumer has already been created, and skip the // creation of another consumer if one already exists if (component.getControlChannelProcessor() == null) { + DynamicRouterControlChannelProcessor processor = controlChannelProcessorFactorySupplier.get() + .getInstance(component); + processor.setConfiguration(configuration); component.setControlChannelProcessor(processor); } } catch (Exception e) { throw new IllegalStateException("Could not create Dynamic Router endpoint", e); } } else { - final DynamicRouterProcessor processor = processorFactorySupplier.get() - .getInstance("dynamicRouterProcessor-" + configuration.getChannel(), getCamelContext(), - configuration.getRecipientMode(), configuration.isWarnDroppedMessage(), - filterProcessorFactorySupplier); - ServiceHelper.startService(processor); + CamelContext camelContext = getCamelContext(); + String routeId = configuration.getRouteId(); + long timeout = configuration.getTimeout(); + ErrorHandler errorHandler = new NoErrorHandler(null); + if (producerCache == null) { + producerCache = new DefaultProducerCache(this, camelContext, 1000); + } + ExecutorService aggregateExecutorService = camelContext.getExecutorServiceManager() + .newScheduledThreadPool(this, "DynamicRouter-AggregateTask", 0); + if (timeout > 0) { + // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in + aggregateExecutorService = camelContext.getExecutorServiceManager() + .newScheduledThreadPool(this, "DynamicRouter-AggregateTask", 0); + } + AggregationStrategy aggregationStrategy = determineAggregationStrategy(camelContext); + DynamicRouterMulticastProcessor processor = processorFactorySupplier.get() + .getInstance("DynamicRouterMulticastProcessor-" + configuration.getChannel(), camelContext, null, + configuration.getRecipientMode(), + configuration.isWarnDroppedMessage(), filterProcessorFactorySupplier, producerCache, + aggregationStrategy, configuration.isParallelProcessing(), + determineExecutorService(camelContext), configuration.isShutdownExecutorService(), + configuration.isStreaming(), configuration.isStopOnException(), timeout, + determineOnPrepare(camelContext), configuration.isShareUnitOfWork(), + configuration.isParallelAggregate()); + processor.setErrorHandler(errorHandler); + processor.setAggregateExecutorService(aggregateExecutorService); + processor.setIgnoreInvalidEndpoints(configuration.isIgnoreInvalidEndpoints()); + processor.setId(getId()); + processor.setRouteId(routeId); + ServiceHelper.startService(aggregationStrategy, producerCache, processor); component.addRoutingProcessor(configuration.getChannel(), processor); } } + protected ExecutorService determineExecutorService(CamelContext camelContext) { + ExecutorService executorService = null; + if (ObjectHelper.isNotEmpty(configuration.getExecutorService())) { + executorService = camelContext.getExecutorServiceManager() + .newThreadPool(this, "@RecipientList", configuration.getExecutorService()); + } + if (configuration.isParallelProcessing() && configuration.getExecutorService() == null) { + // we are running in parallel, so we need a thread pool + executorService = camelContext.getExecutorServiceManager() + .newDefaultThreadPool(this, "@RecipientList"); + } + return executorService; + } + + protected AggregationStrategy determineAggregationStrategy(CamelContext camelContext) { + AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); + if (ObjectHelper.isNotEmpty(configuration.getAggregationStrategy())) { + aggregationStrategy = CamelContextHelper.mandatoryLookup(camelContext, + configuration.getAggregationStrategy(), AggregationStrategy.class); + } + return aggregationStrategy; Review Comment: Thanks. I will make the change as requested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org