This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0516ef9c413bc42085d3ee489b27272d26dbf166 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 07:14:14 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../component/rabbitmq/RabbitMQConsumerTest.java | 45 ++++++++++++++++++---- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java index a281bc3..79d2e11 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java @@ -24,33 +24,50 @@ import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; +import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExecutorServiceManager; import org.junit.jupiter.api.Test; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; public class RabbitMQConsumerTest { + private ExtendedCamelContext ecc = Mockito.mock(ExtendedCamelContext.class); + private ExchangeFactory ef = Mockito.mock(ExchangeFactory.class); private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class); private Connection conn = Mockito.mock(Connection.class); private Processor processor = Mockito.mock(Processor.class); private Channel channel = Mockito.mock(Channel.class); + private ExecutorServiceManager esm = Mockito.mock(ExecutorServiceManager.class); @Test public void testStoppingConsumerShutdownExecutor() throws Exception { - RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); - ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3); + Mockito.when(endpoint.createExecutor()).thenReturn(e); Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn); + Mockito.when(endpoint.getCamelContext()).thenReturn(ecc); + Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc); + Mockito.when(ecc.getExchangeFactory()).thenReturn(ef); + Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef); + Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm); + Mockito.when(esm.shutdownNow(e)).then(i -> e.shutdownNow()); Mockito.when(conn.createChannel()).thenReturn(channel); + RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); + consumer.doStart(); assertFalse(e.isShutdown()); @@ -60,13 +77,20 @@ public class RabbitMQConsumerTest { @Test public void testStoppingConsumerShutdownConnection() throws Exception { - RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); + ExecutorService es = Executors.newFixedThreadPool(3); - Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3)); + Mockito.when(endpoint.createExecutor()).thenReturn(es); Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(channel); + Mockito.when(endpoint.getCamelContext()).thenReturn(ecc); + Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc); + Mockito.when(ecc.getExchangeFactory()).thenReturn(ef); + Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef); + Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm); + Mockito.when(esm.shutdownNow(es)).then(i -> es.shutdownNow()); + RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); consumer.doStart(); consumer.doStop(); @@ -76,18 +100,25 @@ public class RabbitMQConsumerTest { @Test public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception { AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class); + ExecutorService es = Executors.newFixedThreadPool(3); - RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); - - Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3)); + Mockito.when(endpoint.createExecutor()).thenReturn(es); Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(channel); Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG"); Mockito.when(channel.isOpen()).thenReturn(false); + Mockito.when(endpoint.getCamelContext()).thenReturn(ecc); + Mockito.when(ecc.adapt(ExtendedCamelContext.class)).thenReturn(ecc); + Mockito.when(ecc.getExchangeFactory()).thenReturn(ef); + Mockito.when(ef.newExchangeFactory(any())).thenReturn(ef); + Mockito.when(ecc.getExecutorServiceManager()).thenReturn(esm); + Mockito.when(esm.shutdownNow(es)).then(i -> es.shutdownNow()); + Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG"); Mockito.doThrow(alreadyClosedException).when(channel).close(); + RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); consumer.doStart(); consumer.doStop();