Author: davsclaus Date: Wed Jan 18 08:18:08 2012 New Revision: 1232782 URL: http://svn.apache.org/viewvc?rev=1232782&view=rev Log: CAMEL-4911: SedaConsumer should not poll if CamelContext is starting.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java - copied, changed from r1232750, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1232782&r1=1232781&r2=1232782&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed Jan 18 08:18:08 2012 @@ -138,6 +138,18 @@ public class SedaConsumer extends Servic BlockingQueue<Exchange> queue = endpoint.getQueue(); // loop while we are allowed, or if we are stopping loop until the queue is empty while (queue != null && (isRunAllowed())) { + + // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started + if (getEndpoint().getCamelContext().getStatus().isStarting()) { + LOG.trace("CamelContext is starting so skip polling"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + } + continue; + } + // do not poll if we are suspended if (isSuspending() || isSuspended()) { LOG.trace("Consumer is suspended so skip polling"); Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java (from r1232750, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java&r1=1232750&r2=1232782&rev=1232782&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java Wed Jan 18 08:18:08 2012 @@ -16,409 +16,69 @@ */ package org.apache.camel.management; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.seda.SedaEndpoint; +import org.apache.camel.impl.DefaultExchange; /** - * Tests mbeans is registered when adding a 2nd route after CamelContext has been started. + * Tests mbeans is registered when adding a 2nd route from within an existing route. * - * @version + * @version */ -public class ManagedRouteAddRemoveTest extends ManagementTestSupport { +public class ManagedRouteAddFromRouteTest extends ManagementTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").routeId("foo").to("mock:result"); - } - }; - } - - public void testRouteAddRemoteRouteWithTo() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); - result.assertIsSatisfied(); - - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Adding 2nd route"); - - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:bar").routeId("bar").to("mock:bar"); - } - }); - - // and send a message to it - MockEndpoint bar = getMockEndpoint("mock:bar"); - bar.expectedMessageCount(1); - template.sendBody("direct:bar", "Hello World"); - bar.assertIsSatisfied(); - - // there should be one more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(2, names.size()); - - log.info("Removing 2nd route"); - - // now remove the 2nd route - context.stopRoute("bar"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // the producer cache should have been removed - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Shutting down..."); - } - - public void testRouteAddRemoteRouteWithRecipientList() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); - result.assertIsSatisfied(); - - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Adding 2nd route"); - - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:bar").routeId("bar").recipientList(header("bar")); - } - }); - - // and send a message to it - MockEndpoint bar = getMockEndpoint("mock:bar"); - bar.expectedMessageCount(1); - template.sendBodyAndHeader("direct:bar", "Hello World", "bar", "mock:bar"); - bar.assertIsSatisfied(); - - // there should be one more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(2, names.size()); - - log.info("Removing 2nd route"); - - // now remove the 2nd route - context.stopRoute("bar"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // the producer cache should have been removed - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Shutting down..."); - } - - public void testRouteAddRemoteRouteWithRoutingSlip() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); - result.assertIsSatisfied(); - - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Adding 2nd route"); - - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:bar").routeId("bar").routingSlip(header("bar")); - } - }); - - // and send a message to it - MockEndpoint bar = getMockEndpoint("mock:bar"); - bar.expectedMessageCount(1); - template.sendBodyAndHeader("direct:bar", "Hello World", "bar", "mock:bar"); - bar.assertIsSatisfied(); - - // there should be one more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(2, names.size()); - - log.info("Removing 2nd route"); - - // now remove the 2nd route - context.stopRoute("bar"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // the producer cache should have been removed - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Shutting down..."); - } - - public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnException() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); - result.assertIsSatisfied(); - - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); + SedaEndpoint seda = context.getEndpoint("seda:start", SedaEndpoint.class); + seda.getQueue().put(new DefaultExchange(context)); - log.info("Adding 2nd route"); - - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:bar").routeId("bar") - .onException(Exception.class) - .handled(true) - .recipientList(header("error")) - .end().end() - .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced")); + from("seda:start").routeId("foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + RouteBuilder child = new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:bar").routeId("bar").to("mock:bar"); + } + }; + context.addRoutes(child); + } + }) + .to("mock:result"); } - }); - - // and send a message to it - getMockEndpoint("mock:bar").expectedMessageCount(1); - getMockEndpoint("mock:error").expectedMessageCount(1); - - Map<String, Object> headers = new HashMap<String, Object>(); - headers.put("error", "mock:error"); - headers.put("bar", "mock:bar"); - template.sendBodyAndHeaders("direct:bar", "Hello World", headers); - - assertMockEndpointsSatisfied(); - - // there should be two more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(3, names.size()); - - // now stop and remove the 2nd route - log.info("Stopping 2nd route"); - context.stopRoute("bar"); - - log.info("Removing 2nd route"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // the producer cache should have been removed - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Shutting down..."); + }; } - public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnException() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); - result.assertIsSatisfied(); - + public void testAddRouteFromRoute() throws Exception { MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Adding 2nd route"); - - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - onException(Exception.class) - .handled(true) - .recipientList(header("error")) - .end(); - - from("direct:bar").routeId("bar") - .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced")); - } - }); - - // and send a message to it - getMockEndpoint("mock:bar").expectedMessageCount(1); - getMockEndpoint("mock:error").expectedMessageCount(1); - - Map<String, Object> headers = new HashMap<String, Object>(); - headers.put("error", "mock:error"); - headers.put("bar", "mock:bar"); - template.sendBodyAndHeaders("direct:bar", "Hello World", headers); - - assertMockEndpointsSatisfied(); + ObjectName route1 = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\""); - // there should be two more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(3, names.size()); + // should be started + String state = (String) mbeanServer.getAttribute(route1, "State"); + assertEquals("Should be started", ServiceStatus.Started.name(), state); - // now stop and remove the 2nd route - log.info("Stopping 2nd route"); - context.stopRoute("bar"); - - log.info("Removing 2nd route"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // only the producer cache from the 2nd route should have been removed (the on exception becomes context scoped) - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(2, names.size()); - - log.info("Shutting down..."); - } - - public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnCompletion() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); + template.sendBody("seda:start", "Hello World"); result.assertIsSatisfied(); - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Adding 2nd route"); + // find the 2nd route + ObjectName route2 = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"bar\""); - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:bar").routeId("bar") - .onCompletion() - .recipientList(header("done")) - .end().end() - .recipientList(header("bar")); - } - }); - - // and send a message to it - getMockEndpoint("mock:bar").expectedMessageCount(1); - getMockEndpoint("mock:done").expectedMessageCount(1); - - Map<String, Object> headers = new HashMap<String, Object>(); - headers.put("done", "mock:done"); - headers.put("bar", "mock:bar"); - template.sendBodyAndHeaders("direct:bar", "Hello World", headers); - - assertMockEndpointsSatisfied(); - - // there should be two more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(3, names.size()); - - // now stop and remove the 2nd route - log.info("Stopping 2nd route"); - context.stopRoute("bar"); - - log.info("Removing 2nd route"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // the producer cache should have been removed - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Shutting down..."); + // should be started + state = (String) mbeanServer.getAttribute(route2, "State"); + assertEquals("Should be started", ServiceStatus.Started.name(), state); } - public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnCompletion() throws Exception { - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - template.sendBody("direct:start", "Hello World"); - result.assertIsSatisfied(); - - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - - // number of producer caches - Set<ObjectName> names = mbeanServer.queryNames(on, null); - assertEquals(1, names.size()); - - log.info("Adding 2nd route"); - - // add a 2nd route - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - onCompletion() - .recipientList(header("done")) - .end(); - - from("direct:bar").routeId("bar") - .recipientList(header("bar")); - } - }); - - // and send a message to it - getMockEndpoint("mock:bar").expectedMessageCount(1); - getMockEndpoint("mock:done").expectedMessageCount(1); - - Map<String, Object> headers = new HashMap<String, Object>(); - headers.put("done", "mock:done"); - headers.put("bar", "mock:bar"); - template.sendBodyAndHeaders("direct:bar", "Hello World", headers); - - assertMockEndpointsSatisfied(); - - // there should be two more producer cache - names = mbeanServer.queryNames(on, null); - assertEquals(3, names.size()); - - // now stop and remove the 2nd route - log.info("Stopping 2nd route"); - context.stopRoute("bar"); - - log.info("Removing 2nd route"); - boolean removed = context.removeRoute("bar"); - assertTrue(removed); - - // only the producer cache from the 2nd route should have been removed (the on completion is context scoped) - on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*"); - names = mbeanServer.queryNames(on, null); - assertEquals(2, names.size()); - - log.info("Shutting down..."); - } }