This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit e1c93a38abf6ad225f5b64c0bc768e1e134f8543 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jan 24 14:57:06 2020 +0100 CAMEL-14435: Optimize core --- .../workitem/InOnlyCamelWorkItemHandlerTest.java | 101 ------ .../workitem/InOutCamelWorkItemHandlerTest.java | 382 --------------------- .../camel/component/kafka/KafkaProducerTest.java | 6 +- .../apache/camel/component/mock/MockEndpoint.java | 7 +- .../openstack/AbstractProducerTestSupport.java | 6 +- .../main/java/org/apache/camel/CamelContext.java | 22 -- .../org/apache/camel/ExtendedCamelContext.java | 21 ++ .../engine/DefaultAsyncProcessorAwaitManager.java | 3 +- .../camel/processor/CamelInternalProcessor.java | 5 +- .../org/apache/camel/processor/LoopProcessor.java | 15 +- .../apache/camel/processor/MulticastProcessor.java | 16 +- .../java/org/apache/camel/processor/Pipeline.java | 3 +- .../processor/SharedCamelInternalProcessor.java | 2 +- .../org/apache/camel/processor/TryProcessor.java | 13 +- .../processor/aggregate/AggregateProcessor.java | 6 +- .../errorhandler/RedeliveryErrorHandler.java | 2 +- .../loadbalancer/FailOverLoadBalancer.java | 5 +- .../processor/loadbalancer/TopicLoadBalancer.java | 5 +- .../java/org/apache/camel/reifier/LoopReifier.java | 2 +- .../java/org/apache/camel/reifier/TryReifier.java | 2 +- .../core/xml/AbstractCamelContextFactoryBean.java | 4 +- .../impl/CustomHeadersMapFactoryRouteTest.java | 5 +- .../impl/HashMapHeadersMapFactoryRouteTest.java | 3 +- .../headersmap/CamelFastHeadersMapTest.java | 3 +- .../management/mbean/ManagedCamelContext.java | 2 +- .../org/apache/camel/support/DefaultExchange.java | 3 +- .../org/apache/camel/support/DefaultMessage.java | 7 +- 27 files changed, 101 insertions(+), 550 deletions(-) diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java deleted file mode 100644 index 820b50d..0000000 --- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.jbpm.workitem; - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.component.jbpm.JBPMConstants; -import org.apache.camel.impl.engine.DefaultHeadersMapFactory; -import org.apache.camel.spi.HeadersMapFactory; -import org.drools.core.process.instance.impl.WorkItemImpl; -import org.jbpm.process.workitem.core.TestWorkItemManager; -import org.jbpm.services.api.service.ServiceRegistry; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.kie.api.runtime.manager.RuntimeManager; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class InOnlyCamelWorkItemHandlerTest { - - @Mock - ProducerTemplate producerTemplate; - - @Mock - Exchange outExchange; - - @Mock - Message outMessage; - - @Mock - CamelContext camelContext; - - @Mock - RuntimeManager runtimeManager; - - @Test - public void testExecuteInOnlyLocalCamelContext() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - // Register the RuntimeManager bound camelcontext. - try { - ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOnlyCamelWorkItemHandler(runtimeManager); - - TestWorkItemManager manager = new TestWorkItemManager(); - handler.executeWorkItem(workItem, - manager); - assertThat(manager.getResults(), is(notNullValue())); - // InOnly does not complete WorkItem. - assertThat(manager.getResults().size(), equalTo(0)); - } finally { - ServiceRegistry.get().remove(runtimeManagerId + "_CamelService"); - } - } -} diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java deleted file mode 100644 index 32a967d..0000000 --- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.jbpm.workitem; - -import java.util.Map; - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.component.jbpm.JBPMConstants; -import org.apache.camel.impl.engine.DefaultHeadersMapFactory; -import org.apache.camel.spi.HeadersMapFactory; -import org.drools.core.process.instance.impl.WorkItemImpl; -import org.jbpm.bpmn2.handler.WorkItemHandlerRuntimeException; -import org.jbpm.process.workitem.core.TestWorkItemManager; -import org.jbpm.services.api.service.ServiceRegistry; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.kie.api.runtime.manager.RuntimeManager; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class InOutCamelWorkItemHandlerTest { - - @Mock - ProducerTemplate producerTemplate; - - @Mock - Exchange outExchange; - - @Mock - Message outMessage; - - @Mock - CamelContext camelContext; - - @Mock - RuntimeManager runtimeManager; - - @Test - public void testExecuteInOutGlobalCamelContext() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - when(outExchange.getOut()).thenReturn(outMessage); - when(outMessage.getBody()).thenReturn(testReponse); - - try { - ServiceRegistry.get().register("GlobalCamelService", camelContext); - - TestWorkItemManager manager = new TestWorkItemManager(); - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter("CamelEndpointId", camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(); - - handler.executeWorkItem(workItem, manager); - assertThat(manager.getResults(), is(notNullValue())); - assertThat(manager.getResults().size(), equalTo(1)); - assertThat(manager.getResults().containsKey(workItem.getId()), is(true)); - Map<String, Object> results = manager.getResults(workItem.getId()); - assertThat(results.size(), equalTo(2)); - assertThat(results.get("Response"), equalTo(testReponse)); - - } finally { - ServiceRegistry.get().remove("GlobalCamelService"); - } - - } - - @Test - public void testExecuteInOutLocalCamelContext() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - when(outExchange.getOut()).thenReturn(outMessage); - when(outMessage.getBody()).thenReturn(testReponse); - - // Register the RuntimeManager bound camelcontext. - try { - ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager); - - TestWorkItemManager manager = new TestWorkItemManager(); - handler.executeWorkItem(workItem, manager); - assertThat(manager.getResults(), is(notNullValue())); - assertThat(manager.getResults().size(), equalTo(1)); - assertThat(manager.getResults().containsKey(workItem.getId()), is(true)); - - Map<String, Object> results = manager.getResults(workItem.getId()); - assertThat(results.size(), equalTo(2)); - assertThat(results.get(JBPMConstants.RESPONSE_WI_PARAM), equalTo(testReponse)); - } finally { - ServiceRegistry.get().remove(runtimeManagerId + "_CamelService"); - } - } - - @Test - public void testExecuteInOutLocalCamelContextLazyInit() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - when(outExchange.getOut()).thenReturn(outMessage); - when(outMessage.getBody()).thenReturn(testReponse); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager); - - // Register the context after we've created the WIH to test lazy-init. - try { - ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext); - - TestWorkItemManager manager = new TestWorkItemManager(); - handler.executeWorkItem(workItem, manager); - assertThat(manager.getResults(), is(notNullValue())); - assertThat(manager.getResults().size(), equalTo(1)); - assertThat(manager.getResults().containsKey(workItem.getId()), is(true)); - - Map<String, Object> results = manager.getResults(workItem.getId()); - assertThat(results.size(), equalTo(2)); - assertThat(results.get(JBPMConstants.RESPONSE_WI_PARAM), equalTo(testReponse)); - } finally { - ServiceRegistry.get().remove(runtimeManagerId + "_CamelService"); - } - } - - @Test(expected = IllegalArgumentException.class) - public void testExecuteInOutLocalCamelContextLazyInitFail() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager); - - TestWorkItemManager manager = new TestWorkItemManager(); - // This is expected to throw an exception. - handler.executeWorkItem(workItem, manager); - - } - - @Test - public void testExecuteInOutLocalCamelContextDefaultHandleException() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - //Throw an error back to the WIH - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new ToBeHandledException()); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - // Register the RuntimeManager bound camelcontext. - try { - ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager); - - TestWorkItemManager manager = new TestWorkItemManager(); - try { - handler.executeWorkItem(workItem, manager); - throw new RuntimeException("The test expects an exception. This code should never be reached."); - } catch (Throwable wihRe) { - assertThat(wihRe, is(instanceOf(WorkItemHandlerRuntimeException.class))); - assertThat(wihRe.getCause(), is(instanceOf(ToBeHandledException.class))); - } - - } finally { - ServiceRegistry.get().remove(runtimeManagerId + "_CamelService"); - } - } - - @Test - public void testExecuteInOutLocalCamelContextExplicitHandleException() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - //Throw an error back to the WIH - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new ToBeHandledException()); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - // Register the RuntimeManager bound camelcontext. - try { - ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setParameter("HandleExceptions", true); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager); - - TestWorkItemManager manager = new TestWorkItemManager(); - try { - handler.executeWorkItem(workItem, manager); - throw new RuntimeException("The test expects an exception. This code should never be reached."); - } catch (Throwable wihRe) { - assertThat(wihRe, is(instanceOf(WorkItemHandlerRuntimeException.class))); - assertThat(wihRe.getCause(), is(instanceOf(ToBeHandledException.class))); - } - - } finally { - ServiceRegistry.get().remove(runtimeManagerId + "_CamelService"); - } - } - - @Test - public void testExecuteInOutLocalCamelContextNotHandleException() throws Exception { - - String camelEndpointId = "testCamelRoute"; - String camelRouteUri = "direct:" + camelEndpointId; - - String testReponse = "testResponse"; - - String runtimeManagerId = "testRuntimeManager"; - - when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId); - - //Throw an error back to the WIH - when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new NotToBeHandledException()); - when(producerTemplate.getCamelContext()).thenReturn(camelContext); - - when(camelContext.createProducerTemplate()).thenReturn(producerTemplate); - HeadersMapFactory hmf = new DefaultHeadersMapFactory(); - when(camelContext.getHeadersMapFactory()).thenReturn(hmf); - - // Register the RuntimeManager bound camelcontext. - try { - ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext); - - WorkItemImpl workItem = new WorkItemImpl(); - workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId); - workItem.setParameter("Request", "someRequest"); - workItem.setParameter("HandleExceptions", false); - workItem.setDeploymentId("testDeploymentId"); - workItem.setProcessInstanceId(1L); - workItem.setId(1L); - - AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager); - - TestWorkItemManager manager = new TestWorkItemManager(); - try { - handler.executeWorkItem(workItem, manager); - throw new RuntimeException("The test expects an exception. This code should never be reached."); - } catch (Throwable wihRe) { - assertThat(wihRe, is(instanceOf(NotToBeHandledException.class))); - } - - } finally { - ServiceRegistry.get().remove(runtimeManagerId + "_CamelService"); - } - } - - public static class ToBeHandledException extends RuntimeException { - } - - public static class NotToBeHandledException extends RuntimeException { - } - -} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 4e94753..a982e72 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -29,6 +29,7 @@ import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; import org.apache.camel.TypeConverter; import org.apache.camel.impl.DefaultCamelContext; @@ -63,7 +64,7 @@ public class KafkaProducerTest { private TypeConverter converter = Mockito.mock(TypeConverter.class); private CamelContext context = Mockito.mock(CamelContext.class); private Exchange exchange = Mockito.mock(Exchange.class); - private CamelContext camelContext = Mockito.mock(CamelContext.class); + private ExtendedCamelContext camelContext = Mockito.mock(ExtendedCamelContext.class); private Message in = new DefaultMessage(camelContext); private Message out = new DefaultMessage(camelContext); private AsyncCallback callback = Mockito.mock(AsyncCallback.class); @@ -87,7 +88,8 @@ public class KafkaProducerTest { Mockito.when(exchange.getContext()).thenReturn(context); Mockito.when(context.getTypeConverter()).thenReturn(converter); Mockito.when(converter.tryConvertTo(String.class, exchange, null)).thenReturn(null); - Mockito.when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory()); + Mockito.when(camelContext.adapt(ExtendedCamelContext.class)).thenReturn(camelContext); + Mockito.when(camelContext.adapt(ExtendedCamelContext.class).getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory()); Mockito.when(camelContext.getTypeConverter()).thenReturn(converter); producer.setKafkaProducer(kp); diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java index 4d68bc0..5557825 100644 --- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java +++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java @@ -38,6 +38,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Handler; import org.apache.camel.Message; import org.apache.camel.Predicate; @@ -548,7 +549,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, expectedMinimumMessageCount(1); } if (expectedHeaderValues == null) { - expectedHeaderValues = getCamelContext().getHeadersMapFactory().newMap(); + expectedHeaderValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(); // we just wants to expects to be called once expects(new AssertionTask() { @Override @@ -1565,7 +1566,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (expectedHeaderValues != null) { if (actualHeaderValues == null) { - actualHeaderValues = getCamelContext().getHeadersMapFactory().newMap(); + actualHeaderValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(); } if (in.hasHeaders()) { actualHeaderValues.putAll(in.getHeaders()); @@ -1574,7 +1575,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (expectedPropertyValues != null) { if (actualPropertyValues == null) { - actualPropertyValues = getCamelContext().getHeadersMapFactory().newMap(); + actualPropertyValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(); } actualPropertyValues.putAll(copy.getProperties()); } diff --git a/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java b/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java index 86aeddf..8406e84 100644 --- a/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java +++ b/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; import org.apache.camel.Producer; import org.apache.camel.impl.engine.DefaultHeadersMapFactory; @@ -42,7 +43,7 @@ public abstract class AbstractProducerTestSupport { protected Exchange exchange; @Mock - protected CamelContext camelContext; + protected ExtendedCamelContext camelContext; protected Message msg; @@ -52,6 +53,7 @@ public abstract class AbstractProducerTestSupport { public void before() throws IOException { msg = new DefaultMessage(camelContext); when(exchange.getIn()).thenReturn(msg); - when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory()); + when(camelContext.adapt(ExtendedCamelContext.class)).thenReturn(camelContext); + when(camelContext.adapt(ExtendedCamelContext.class).getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory()); } } diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index 6b0396f..eb9dd95 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -25,7 +25,6 @@ import org.apache.camel.spi.DataType; import org.apache.camel.spi.Debugger; import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.ExecutorServiceManager; -import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.Injector; import org.apache.camel.spi.Language; @@ -34,7 +33,6 @@ import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.PropertiesComponent; -import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RestConfiguration; import org.apache.camel.spi.RestRegistry; @@ -1272,24 +1270,4 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration { */ SSLContextParameters getSSLContextParameters(); - /** - * Gets the {@link HeadersMapFactory} to use. - */ - HeadersMapFactory getHeadersMapFactory(); - - /** - * Sets a custom {@link HeadersMapFactory} to be used. - */ - void setHeadersMapFactory(HeadersMapFactory factory); - - /** - * Gets the {@link ReactiveExecutor} to use. - */ - ReactiveExecutor getReactiveExecutor(); - - /** - * Sets a custom {@link ReactiveExecutor} to be used. - */ - void setReactiveExecutor(ReactiveExecutor reactiveExecutor); - } diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 7e38f0f..22efe7d 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -34,6 +34,7 @@ import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; +import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.LogListener; @@ -43,6 +44,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; import org.apache.camel.spi.ProcessorFactory; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.spi.UnitOfWorkFactory; @@ -359,4 +361,23 @@ public interface ExtendedCamelContext extends CamelContext { */ void setBeanIntrospection(BeanIntrospection beanIntrospection); + /** + * Gets the {@link HeadersMapFactory} to use. + */ + HeadersMapFactory getHeadersMapFactory(); + + /** + * Sets a custom {@link HeadersMapFactory} to be used. + */ + void setHeadersMapFactory(HeadersMapFactory factory); + + /** + * Gets the {@link ReactiveExecutor} to use. + */ + ReactiveExecutor getReactiveExecutor(); + + /** + * Sets a custom {@link ReactiveExecutor} to be used. + */ + void setReactiveExecutor(ReactiveExecutor reactiveExecutor); } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java index 7631456..fb67842 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StaticService; @@ -87,7 +88,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements } public void await(Exchange exchange, CountDownLatch latch) { - ReactiveExecutor reactiveExecutor = exchange.getContext().getReactiveExecutor(); + ReactiveExecutor reactiveExecutor = exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor(); // Early exit for pending reactive queued work do { if (latch.getCount() <= 0) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 527a972..783fd2e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -55,7 +55,6 @@ import org.apache.camel.spi.Transformer; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.spi.UnitOfWorkFactory; import org.apache.camel.support.CamelContextHelper; -import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.OrderedComparator; import org.apache.camel.support.SynchronizationAdapter; @@ -106,14 +105,14 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { public CamelInternalProcessor(CamelContext camelContext) { this.camelContext = camelContext; - this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.shutdownStrategy = camelContext.getShutdownStrategy(); } public CamelInternalProcessor(CamelContext camelContext, Processor processor) { super(processor); this.camelContext = camelContext; - this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.shutdownStrategy = camelContext.getShutdownStrategy(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java index aae38ee..a70d550 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -17,13 +17,16 @@ package org.apache.camel.processor; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -42,12 +45,16 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, private String id; private String routeId; + private final CamelContext camelContext; + private final ReactiveExecutor reactiveExecutor; private final Expression expression; private final Predicate predicate; private final boolean copy; - public LoopProcessor(Processor processor, Expression expression, Predicate predicate, boolean copy) { + public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate, boolean copy) { super(processor); + this.camelContext = camelContext; + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.expression = expression; this.predicate = predicate; this.copy = copy; @@ -59,9 +66,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, LoopState state = new LoopState(exchange, callback); if (exchange.isTransacted()) { - exchange.getContext().getReactiveExecutor().scheduleSync(state); + reactiveExecutor.scheduleSync(state); } else { - exchange.getContext().getReactiveExecutor().scheduleMain(state); + reactiveExecutor.scheduleMain(state); } return false; } catch (Exception e) { @@ -117,7 +124,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, processor.process(current, doneSync -> { // increment counter after done index++; - exchange.getContext().getReactiveExecutor().schedule(this); + reactiveExecutor.schedule(this); }); } else { // we are done so prepare the result diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 7a4c0bd..82bafea 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -43,6 +43,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -50,6 +51,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.UnitOfWork; @@ -145,6 +147,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat protected final Processor onPrepare; private final CamelContext camelContext; + private final ReactiveExecutor reactiveExecutor; private String id; private String routeId; private Collection<Processor> processors; @@ -182,6 +185,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat boolean parallelAggregate, boolean stopOnAggregateException) { notNull(camelContext, "camelContext"); this.camelContext = camelContext; + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.processors = processors; this.aggregationStrategy = aggregationStrategy; this.executorService = executorService; @@ -246,12 +250,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat MulticastState state = new MulticastState(exchange, pairs, callback); if (isParallelProcessing()) { - executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state)); + executorService.submit(() -> reactiveExecutor.schedule(state)); } else { if (exchange.isTransacted()) { - exchange.getContext().getReactiveExecutor().scheduleSync(state); + reactiveExecutor.scheduleSync(state); } else { - exchange.getContext().getReactiveExecutor().scheduleMain(state); + reactiveExecutor.scheduleMain(state); } } @@ -263,9 +267,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat protected void schedule(Runnable runnable) { if (isParallelProcessing()) { - executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable)); + executorService.submit(() -> reactiveExecutor.schedule(runnable)); } else { - camelContext.getReactiveExecutor().schedule(runnable); + reactiveExecutor.schedule(runnable); } } @@ -552,7 +556,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); } - camelContext.getReactiveExecutor().schedule(callback); + reactiveExecutor.schedule(callback); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 3cfba1b..350c5e5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -26,6 +26,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; @@ -58,7 +59,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo public Pipeline(CamelContext camelContext, Collection<Processor> processors) { this.camelContext = camelContext; - this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList()); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 37d457c..5e23568 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -81,7 +81,7 @@ public class SharedCamelInternalProcessor { public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) { this.camelContext = camelContext; - this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); this.shutdownStrategy = camelContext.getShutdownStrategy(); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java index b29a275..cd08f60 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -22,11 +22,14 @@ import java.util.List; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; @@ -42,13 +45,17 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class); + protected final CamelContext camelContext; + protected final ReactiveExecutor reactiveExecutor; protected String id; protected String routeId; protected final Processor tryProcessor; protected final List<Processor> catchClauses; protected final Processor finallyProcessor; - public TryProcessor(Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) { + public TryProcessor(CamelContext camelContext, Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) { + this.camelContext = camelContext; + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.tryProcessor = tryProcessor; this.catchClauses = catchClauses; this.finallyProcessor = finallyProcessor; @@ -66,7 +73,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc @Override public boolean process(Exchange exchange, AsyncCallback callback) { - exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback)); + reactiveExecutor.schedule(new TryState(exchange, callback)); return false; } @@ -95,7 +102,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this)); + async.process(exchange, doneSync -> reactiveExecutor.schedule(this)); } else { ExchangeHelper.prepareOutToIn(exchange); exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 9eac797..a8bde2d 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -42,6 +42,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.NoSuchEndpointException; @@ -55,6 +56,7 @@ import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.OptimisticLockingAggregationRepository; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RecoverableAggregationRepository; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.ShutdownAware; @@ -106,6 +108,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat private volatile Lock lock; private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean(); private final CamelContext camelContext; + private final ReactiveExecutor reactiveExecutor; private final AsyncProcessor processor; private String id; private String routeId; @@ -259,6 +262,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); ObjectHelper.notNull(executorService, "executorService"); this.camelContext = camelContext; + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.processor = processor; this.correlationExpression = correlationExpression; this.aggregationStrategy = aggregationStrategy; @@ -857,7 +861,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // send this exchange // the call to schedule last if needed to ensure in-order processing of the aggregates - executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> { + executorService.submit(() -> reactiveExecutor.scheduleSync(() -> processor.process(exchange, done -> { // log exception if there was a problem if (exchange.getException() != null) { // if there was an exception then let the exception handler handle it diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 32d61bc..e9bb3bb 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -102,7 +102,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); this.camelContext = camelContext; - this.reactiveExecutor = camelContext.getReactiveExecutor(); + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); this.shutdownStrategy = camelContext.getShutdownStrategy(); this.redeliveryProcessor = redeliveryProcessor; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index 3f1a15e..286ea75 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -25,6 +25,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Traceable; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.ExchangeHelper; @@ -164,7 +165,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { AsyncProcessor[] processors = doGetProcessors(); - exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run); + exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(new State(exchange, callback, processors)::run); return false; } @@ -251,7 +252,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab // process the exchange LOG.debug("Processing failover at attempt {} for {}", attempts, copy); - processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run)); + processor.process(copy, doneSync -> exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(this::run)); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java index e8171b7..433a9fd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java @@ -19,6 +19,7 @@ package org.apache.camel.processor.loadbalancer; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; /** @@ -33,7 +34,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport { @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { AsyncProcessor[] processors = doGetProcessors(); - exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run); + exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(new State(exchange, callback, processors)::run); return false; } @@ -64,7 +65,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport { exchange.setException(current.getException()); callback.done(false); } else { - exchange.getContext().getReactiveExecutor().schedule(this::run); + exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(this::run); } } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java index 434abc0..464599a 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java @@ -43,7 +43,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> { } else { expression = definition.getExpression().createExpression(routeContext); } - return new LoopProcessor(output, expression, predicate, isCopy); + return new LoopProcessor(routeContext.getCamelContext(), output, expression, predicate, isCopy); } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java index 4809de5..1a88847 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java @@ -59,7 +59,7 @@ public class TryReifier extends ProcessorReifier<TryDefinition> { throw new IllegalArgumentException("doTry must have one or more catch or finally blocks on " + this); } - return new TryProcessor(tryProcessor, catchProcessors, finallyProcessor); + return new TryProcessor(routeContext.getCamelContext(), tryProcessor, catchProcessors, finallyProcessor); } } diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 4cc4729..e529703 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -260,7 +260,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex HeadersMapFactory headersMapFactory = getBeanForType(HeadersMapFactory.class); if (headersMapFactory != null) { LOG.info("Using custom HeadersMapFactory: {}", headersMapFactory); - getContext().setHeadersMapFactory(headersMapFactory); + getContext().adapt(ExtendedCamelContext.class).setHeadersMapFactory(headersMapFactory); } JSonSchemaResolver jsonSchemaResolver = getBeanForType(JSonSchemaResolver.class); if (jsonSchemaResolver != null) { @@ -1231,7 +1231,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex ReactiveExecutor reactiveExecutor = getBeanForType(ReactiveExecutor.class); if (reactiveExecutor != null) { // already logged in CamelContext - getContext().setReactiveExecutor(reactiveExecutor); + getContext().adapt(ExtendedCamelContext.class).setReactiveExecutor(reactiveExecutor); } } } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java index 1d78021..be908ea 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.HeadersMapFactory; import org.junit.Test; @@ -32,7 +33,7 @@ public class CustomHeadersMapFactoryRouteTest extends ContextTestSupport { @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); - context.setHeadersMapFactory(custom); + context.adapt(ExtendedCamelContext.class).setHeadersMapFactory(custom); return context; } @@ -51,7 +52,7 @@ public class CustomHeadersMapFactoryRouteTest extends ContextTestSupport { assertMockEndpointsSatisfied(); - assertSame(custom, context.getHeadersMapFactory()); + assertSame(custom, context.adapt(ExtendedCamelContext.class).getHeadersMapFactory()); } @Override diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java index d906a80..052c7f4 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.engine.HashMapHeadersMapFactory; import org.junit.Test; @@ -30,7 +31,7 @@ public class HashMapHeadersMapFactoryRouteTest extends ContextTestSupport { @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); - context.setHeadersMapFactory(new HashMapHeadersMapFactory()); + context.adapt(ExtendedCamelContext.class).setHeadersMapFactory(new HashMapHeadersMapFactory()); return context; } diff --git a/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java b/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java index 9df52ae..616671b 100644 --- a/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java +++ b/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.headersmap; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.spi.HeadersMapFactory; @@ -34,7 +35,7 @@ public class CamelFastHeadersMapTest extends CamelTestSupport { assertMockEndpointsSatisfied(); // should have detected custom and use that - HeadersMapFactory factory = context.getHeadersMapFactory(); + HeadersMapFactory factory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory(); assertIsInstanceOf(FastHeadersMapFactory.class, factory); } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java index f9cb86d..2b7841a 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java @@ -143,7 +143,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti @Override public String getHeadersMapFactoryClassName() { - return context.getHeadersMapFactory().getClass().getName(); + return context.adapt(ExtendedCamelContext.class).getHeadersMapFactory().getClass().getName(); } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index 4e86dd6..ac78c4d 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -29,6 +29,7 @@ import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.MessageHistory; @@ -132,7 +133,7 @@ public final class DefaultExchange implements ExtendedExchange { return null; } - return context.getHeadersMapFactory().newMap(headers); + return context.adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(headers); } @SuppressWarnings("unchecked") diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java index 9a4da71..392191c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.util.ObjectHelper; @@ -212,11 +213,11 @@ public class DefaultMessage extends MessageSupport { public void setHeaders(Map<String, Object> headers) { ObjectHelper.notNull(getCamelContext(), "CamelContext", this); - if (getCamelContext().getHeadersMapFactory().isInstanceOf(headers)) { + if (getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().isInstanceOf(headers)) { this.headers = headers; } else { // create a new map - this.headers = getCamelContext().getHeadersMapFactory().newMap(headers); + this.headers = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(headers); } } @@ -247,7 +248,7 @@ public class DefaultMessage extends MessageSupport { protected Map<String, Object> createHeaders() { ObjectHelper.notNull(getCamelContext(), "CamelContext", this); - Map<String, Object> map = getCamelContext().getHeadersMapFactory().newMap(); + Map<String, Object> map = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(); populateInitialHeaders(map); return map; }