CAMEL-6545 seda producer - Add option to fail for non existing queue with thanks to Christian Posta
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/783a2f52 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/783a2f52 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/783a2f52 Branch: refs/heads/master Commit: 783a2f52d18074389508c0525c49acf303ca18ae Parents: fd3794d Author: Willem Jiang <ningji...@apache.org> Authored: Thu Aug 22 08:48:43 2013 +0800 Committer: Willem Jiang <ningji...@apache.org> Committed: Thu Aug 22 08:48:43 2013 +0800 ---------------------------------------------------------------------- .../camel/component/seda/QueueReference.java | 93 ++++++++++++ .../camel/component/seda/SedaComponent.java | 89 ++++-------- .../seda/SedaConsumerNotAvailableException.java | 31 ++++ .../camel/component/seda/SedaEndpoint.java | 35 ++++- .../camel/component/seda/SedaProducer.java | 30 +++- .../apache/camel/component/vm/VmComponent.java | 1 + .../SedaComponentReferenceEndpointTest.java | 2 +- .../component/seda/SedaNoConsumerTest.java | 140 ++++++++++++++++++- .../vm/VmComponentReferenceEndpointTest.java | 4 +- 9 files changed, 346 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java b/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java new file mode 100644 index 0000000..5b7fd50 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/QueueReference.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +import org.apache.camel.Exchange; + +/** + * Holder for queue references. + * <p/> + * This is used to keep track of the usages of the queues, so we know when a queue is no longer + * in use, and can safely be discarded. + */ +public final class QueueReference { + + private final BlockingQueue<Exchange> queue; + private Integer size; + private Boolean multipleConsumers; + + private List<SedaEndpoint> endpoints = new LinkedList<SedaEndpoint>(); + + QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) { + this.queue = queue; + this.size = size; + this.multipleConsumers = multipleConsumers; + } + + synchronized void addReference(SedaEndpoint endpoint) { + if (!endpoints.contains(endpoint)) { + endpoints.add(endpoint); + } + } + + synchronized void removeReference(SedaEndpoint endpoint) { + if (endpoints.contains(endpoint)) { + endpoints.remove(endpoint); + } + } + + /** + * Gets the reference counter + */ + public synchronized int getCount() { + return endpoints.size(); + } + + /** + * Gets the queue size + * + * @return <tt>null</tt> if unbounded + */ + public Integer getSize() { + return size; + } + + public Boolean getMultipleConsumers() { + return multipleConsumers; + } + + /** + * Gets the queue + */ + public BlockingQueue<Exchange> getQueue() { + return queue; + } + + public synchronized boolean hasConsumers() { + for (SedaEndpoint endpoint : endpoints) { + if (endpoint.getConsumers().size() > 0) { + return true; + } + } + + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index 6b8d81a..64415b3 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -68,22 +68,22 @@ public class SedaComponent extends UriEndpointComponent { } /** - * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)} + * @deprecated use */ @Deprecated - public synchronized QueueReference getOrCreateQueue(String uri, Integer size) { - return getOrCreateQueue(uri, size, null); + public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) { + return getOrCreateQueue(endpoint, size, null); } /** - * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)} + * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)} */ - public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) { - return getOrCreateQueue(uri, size, multipleConsumers, null); + public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) { + return getOrCreateQueue(endpoint, size, multipleConsumers, null); } - public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) { - String key = getQueueKey(uri); + public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) { + String key = getQueueKey(endpoint.getEndpointUri()); QueueReference ref = getQueues().get(key); if (ref != null) { @@ -95,7 +95,7 @@ public class SedaComponent extends UriEndpointComponent { + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size); } // add the reference before returning queue - ref.addReference(); + ref.addReference(endpoint); if (log.isDebugEnabled()) { log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()}); @@ -120,12 +120,25 @@ public class SedaComponent extends UriEndpointComponent { // create and add a new reference queue ref = new QueueReference(queue, size, multipleConsumers); - ref.addReference(); + ref.addReference(endpoint); getQueues().put(key, ref); return ref; } + public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue queue) { + String key = getQueueKey(endpoint.getEndpointUri()); + + QueueReference ref = getQueues().get(key); + if (ref == null) { + ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers()); + ref.addReference(endpoint); + getQueues().put(key, ref); + } + + return ref; + } + public Map<String, QueueReference> getQueues() { return queues; } @@ -186,7 +199,7 @@ public class SedaComponent extends UriEndpointComponent { String key = getQueueKey(endpoint.getEndpointUri()); QueueReference ref = getQueues().get(key); if (ref != null) { - ref.removeReference(); + ref.removeReference(endpoint); if (ref.getCount() <= 0) { // reference no longer needed so remove from queues getQueues().remove(key); @@ -194,58 +207,4 @@ public class SedaComponent extends UriEndpointComponent { } } - /** - * Holder for queue references. - * <p/> - * This is used to keep track of the usages of the queues, so we know when a queue is no longer - * in use, and can safely be discarded. - */ - public static final class QueueReference { - - private final BlockingQueue<Exchange> queue; - private volatile int count; - private Integer size; - private Boolean multipleConsumers; - - private QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) { - this.queue = queue; - this.size = size; - this.multipleConsumers = multipleConsumers; - } - - void addReference() { - count++; - } - - void removeReference() { - count--; - } - - /** - * Gets the reference counter - */ - public int getCount() { - return count; - } - - /** - * Gets the queue size - * - * @return <tt>null</tt> if unbounded - */ - public Integer getSize() { - return size; - } - - public Boolean getMultipleConsumers() { - return multipleConsumers; - } - - /** - * Gets the queue - */ - public BlockingQueue<Exchange> getQueue() { - return queue; - } - } } http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java new file mode 100644 index 0000000..d49e0b6 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumerNotAvailableException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; + +/** + * @author <a href="http://www.christianposta.com/blog">Christian Posta</a> + */ +public class SedaConsumerNotAvailableException extends CamelExchangeException { + private static final long serialVersionUID = 683242306650809007L; + + public SedaConsumerNotAvailableException(String message, Exchange exchange) { + super(message, exchange); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 2b81768..5fe1b8c 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -78,6 +78,10 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, private int pollTimeout = 1000; @UriParam private boolean purgeWhenStopping; + + @UriParam + private boolean failIfNoConsumers; + private BlockingQueueFactory<Exchange> queueFactory; public SedaEndpoint() { @@ -95,6 +99,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, this.size = queue.remainingCapacity(); } queueFactory = new LinkedBlockingQueueFactory<Exchange>(); + getComponent().registerQueue(this, queue); } public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) { @@ -120,7 +125,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (getComponent() != null) { // all consumers must match having the same multipleConsumers options String key = getComponent().getQueueKey(getEndpointUri()); - SedaComponent.QueueReference ref = getComponent().getQueueReference(key); + QueueReference ref = getComponent().getQueueReference(key); if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) { // there is already a multiple consumers, so make sure they matches throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers " @@ -141,7 +146,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (getComponent() != null) { // use null to indicate default size (= use what the existing queue has been configured with) Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); - SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers(), queueFactory); + QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory); queue = ref.getQueue(); String key = getComponent().getQueueKey(getEndpointUri()); LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); @@ -166,6 +171,16 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } } + public synchronized QueueReference getQueueReference() { + String key = getComponent().getQueueKey(getEndpointUri()); + QueueReference ref = getComponent().getQueueReference(key); + if (ref == null) { + LOG.warn("There was no queue reference for this queue!"); + } + + return ref; + } + protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception { if (!multicastStarted && consumerMulticastProcessor != null) { // only start it on-demand to avoid starting it during stopping @@ -259,6 +274,15 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } @ManagedAttribute + public boolean isFailIfNoConsumers() { + return failIfNoConsumers; + } + + public void setFailIfNoConsumers(boolean failIfNoConsumers) { + this.failIfNoConsumers = failIfNoConsumers; + } + + @ManagedAttribute public boolean isMultipleConsumers() { return multipleConsumers; } @@ -468,4 +492,11 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, // clear queue, as we are shutdown, so if re-created then the queue must be updated queue = null; } + + public boolean hasConsumers() { + if (this.consumers == null) { + return false; + } + return this.consumers.size() > 0; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java index 3614460..ada3cde 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -27,11 +27,15 @@ import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @version */ public class SedaProducer extends DefaultAsyncProducer { + private static final transient Logger LOG = LoggerFactory.getLogger(SedaProducer.class); + /** * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used. */ @@ -120,7 +124,13 @@ public class SedaProducer extends DefaultAsyncProducer { }); log.trace("Adding Exchange to queue: {}", copy); - addToQueue(copy); + try { + addToQueue(copy); + } catch (SedaConsumerNotAvailableException e) { + exchange.setException(e); + callback.done(true); + return true; + } if (timeout > 0) { if (log.isTraceEnabled()) { @@ -156,7 +166,13 @@ public class SedaProducer extends DefaultAsyncProducer { // handover the completion so its the copy which performs that, as we do not wait Exchange copy = prepareCopy(exchange, true); log.trace("Adding Exchange to queue: {}", copy); - addToQueue(copy); + try { + addToQueue(copy); + } catch (SedaConsumerNotAvailableException e) { + exchange.setException(e); + callback.done(true); + return true; + } } // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done @@ -193,8 +209,14 @@ public class SedaProducer extends DefaultAsyncProducer { * * @param exchange the exchange to add to the queue */ - protected void addToQueue(Exchange exchange) { - BlockingQueue<Exchange> queue = endpoint.getQueue(); + protected void addToQueue(Exchange exchange) throws SedaConsumerNotAvailableException { + QueueReference queueReference = endpoint.getQueueReference(); + BlockingQueue<Exchange> queue = queueReference.getQueue(); + + if (endpoint.isFailIfNoConsumers() && !queueReference.hasConsumers()) { + LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange); + throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } if (blockWhenFull) { try { queue.put(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java b/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java index 32e72d8..392d3c1 100644 --- a/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.camel.component.seda.QueueReference; import org.apache.camel.component.seda.SedaComponent; /** http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java index 3a0b19b..9ea2fd0 100644 --- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaComponentReferenceEndpointTest.java @@ -78,7 +78,7 @@ public class SedaComponentReferenceEndpointTest extends ContextTestSupport { private int numberOfReferences(SedaComponent seda) { int num = 0; - Iterator<SedaComponent.QueueReference> it = seda.getQueues().values().iterator(); + Iterator<QueueReference> it = seda.getQueues().values().iterator(); while (it.hasNext()) { num += it.next().getCount(); } http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java index 6e36612..8a89baa 100644 --- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java @@ -16,22 +16,53 @@ */ package org.apache.camel.component.seda; +import java.util.concurrent.TimeUnit; + import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.builder.NotifyBuilder; import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; /** * @version */ public class SedaNoConsumerTest extends ContextTestSupport { + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testInOnly() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo?timeout=1000"); + } + }); + + context.start(); + NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create(); + // no problem for in only as we do not expect a reply template.sendBody("direct:start", "Hello World"); + notify.matches(2, TimeUnit.SECONDS); + } public void testInOut() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo?timeout=1000"); + } + }); + + context.start(); + try { template.requestBody("direct:start", "Hello World"); fail("Should throw an exception"); @@ -40,13 +71,112 @@ public class SedaNoConsumerTest extends ContextTestSupport { } } - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { + + @Test + public void testFailIfNoConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("seda:foo?timeout=1000"); + from("direct:start").to("seda:foo?failIfNoConsumers=true"); } - }; + }); + + context.start(); + + try { + template.sendBody("direct:start", "Hello World"); + fail("Should throw an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(SedaConsumerNotAvailableException.class, e.getCause()); + } + + } + + @Test + public void testFailIfNoConsuemerAndMultipleConsumerSetting() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?failIfNoConsumers=true&multipleConsumers=true").to("mock:foo"); + from("seda:foo?failIfNoConsumers=true&multipleConsumers=true").to("mock:bar"); + } + }); + + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + + + } + + @Test + public void testFailIfNoConsumesrAfterConsumersLeave() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?failIfNoConsumers=true").routeId("stopThisRoute").to("mock:foo"); + } + }); + + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo?failIfNoConsumers=true", "Hello World"); + + assertMockEndpointsSatisfied(); + + context.stopRoute("stopThisRoute"); + TimeUnit.MILLISECONDS.sleep(100); + try { + template.sendBody("seda:foo?failIfNoConsumers=true", "Hello World"); + fail("Should throw an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(SedaConsumerNotAvailableException.class, e.getCause()); + } + } + + @Test + public void testFailIfNoConsumersWithValidConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:in").to("seda:foo?failIfNoConsumers=true"); + + from("seda:foo").to("mock:foo"); + } + }); + + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:in", "Hello World"); + + assertMockEndpointsSatisfied(); + + } + + @Test + public void testConfigOnAConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + // these are the same! + from("seda:foo?failIfNoConsumers=true").to("log:test"); + from("seda:foo").to("log:test2"); + + } + }); + + context.start(); + Thread.sleep(2 * 1000); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/783a2f52/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java b/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java index 4f6077c..b74869e 100644 --- a/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/vm/VmComponentReferenceEndpointTest.java @@ -20,7 +20,7 @@ import java.util.Iterator; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.seda.SedaComponent; +import org.apache.camel.component.seda.QueueReference; /** * @@ -79,7 +79,7 @@ public class VmComponentReferenceEndpointTest extends ContextTestSupport { private int numberOfReferences(VmComponent vm) { int num = 0; - Iterator<SedaComponent.QueueReference> it = vm.getQueues().values().iterator(); + Iterator<QueueReference> it = vm.getQueues().values().iterator(); while (it.hasNext()) { num += it.next().getCount(); }