CAMEL-6588 Choose BlockingQueue implementation in Seda component with thanks to Gérald
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/276aa87e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/276aa87e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/276aa87e Branch: refs/heads/master Commit: 276aa87ec38bfd884a3dc52d8685d6be3922f553 Parents: f58ccf9 Author: Willem Jiang <ningji...@apache.org> Authored: Fri Aug 2 10:41:13 2013 +0800 Committer: Willem Jiang <ningji...@apache.org> Committed: Fri Aug 2 11:33:33 2013 +0800 ---------------------------------------------------------------------- .../seda/ArrayBlockingQueueFactory.java | 73 ++++++++++++++++++++ .../component/seda/BlockingQueueFactory.java | 38 ++++++++++ .../seda/LinkedBlockingQueueFactory.java | 35 ++++++++++ .../seda/PriorityBlockingQueueFactory.java | 54 +++++++++++++++ .../camel/component/seda/SedaComponent.java | 40 ++++++++--- .../camel/component/seda/SedaEndpoint.java | 21 ++++-- .../component/seda/SedaQueueFactoryTest.java | 66 ++++++++++++++++++ .../camel/component/seda/SedaQueueTest.java | 24 +++++++ 8 files changed, 338 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java new file mode 100644 index 0000000..3983ff8 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + + +import java.util.concurrent.ArrayBlockingQueue; + +/** + * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.ArrayBlockingQueue} + */ +public class ArrayBlockingQueueFactory<E> implements BlockingQueueFactory<E> { + /** + * Capacity used when none provided + */ + private int defaultCapacity=50; + /** + * Lock fairness. null means default fairness + */ + private Boolean fair; + /** + * @return Default array capacity + */ + public int getDefaultCapacity() { + return defaultCapacity; + } + + /** + * @param defaultCapacity Default array capacity + */ + public void setDefaultCapacity(int defaultCapacity) { + this.defaultCapacity = defaultCapacity; + } + + /** + * @return Lock fairness + */ + public boolean isFair() { + return fair; + } + + /** + * @param fair Lock fairness + */ + public void setFair(boolean fair) { + this.fair = fair; + } + + @Override + public ArrayBlockingQueue<E> create() { + return create(defaultCapacity); + } + + @Override + public ArrayBlockingQueue<E> create(int capacity) { + return fair == null ? + new ArrayBlockingQueue<E>(defaultCapacity) : + new ArrayBlockingQueue<E>(defaultCapacity, fair) ; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java new file mode 100644 index 0000000..0d69433 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + + +import java.util.concurrent.BlockingQueue; +import org.apache.camel.Exchange; + +/** + * Factory of {@link java.util.concurrent.BlockingQueue} + * @param <E> Element type, usually {@link Exchange} + */ +public interface BlockingQueueFactory<E> { + /** + * Create a new {@link java.util.concurrent.BlockingQueue} with default capacity + * @return New {@link java.util.concurrent.BlockingQueue} + */ + BlockingQueue<E> create(); + /** + * Create a new {@link java.util.concurrent.BlockingQueue} with given capacity + * @return New {@link java.util.concurrent.BlockingQueue} + */ + BlockingQueue<E> create(int capacity); +} http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java new file mode 100644 index 0000000..096cd5b --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.LinkedBlockingQueue} + */ +public class LinkedBlockingQueueFactory<E> implements BlockingQueueFactory<E> { + @Override + public LinkedBlockingQueue<E> create() { + return new LinkedBlockingQueue<E>(); + } + + @Override + public LinkedBlockingQueue<E> create(int capacity) { + return new LinkedBlockingQueue<E>(capacity); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java new file mode 100644 index 0000000..da90d16 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.Comparator; + +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.PriorityBlockingQueue} + */ +public class PriorityBlockingQueueFactory<E> implements BlockingQueueFactory<E> { + /** + * Comparator used to sort exchanges + */ + private Comparator<E> comparator; + + public Comparator<E> getComparator() { + return comparator; + } + + public void setComparator(Comparator<E> comparator) { + this.comparator = comparator; + } + + @Override + public PriorityBlockingQueue<E> create() { + return comparator==null ? + new PriorityBlockingQueue<E>() : + // PriorityQueue as a default capacity of 11 + new PriorityBlockingQueue<E>(11, comparator); + } + + @Override + public PriorityBlockingQueue<E> create(int capacity) { + return comparator==null? + new PriorityBlockingQueue<E>(capacity): + new PriorityBlockingQueue<E>(capacity, comparator); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index e6d5171..b13dd64 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -19,7 +19,6 @@ package org.apache.camel.component.seda; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -39,7 +38,7 @@ public class SedaComponent extends UriEndpointComponent { protected int queueSize; protected int defaultConcurrentConsumers = 1; private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>(); - + private BlockingQueueFactory<Exchange> defaultQueueFactory =new LinkedBlockingQueueFactory<Exchange>(); public SedaComponent() { super(SedaEndpoint.class); } @@ -60,15 +59,30 @@ public class SedaComponent extends UriEndpointComponent { return defaultConcurrentConsumers; } + public BlockingQueueFactory<Exchange> getDefaultQueueFactory() { + return defaultQueueFactory; + } + + public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) { + this.defaultQueueFactory = defaultQueueFactory; + } + /** - * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)} + * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)} */ @Deprecated public synchronized QueueReference getOrCreateQueue(String uri, Integer size) { return getOrCreateQueue(uri, size, null); } + /** + * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)} + */ public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) { + return getOrCreateQueue(uri, size, multipleConsumers, null); + } + + public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) { String key = getQueueKey(uri); QueueReference ref = getQueues().get(key); @@ -91,14 +105,15 @@ public class SedaComponent extends UriEndpointComponent { // create queue BlockingQueue<Exchange> queue; + BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory; if (size != null && size > 0) { - queue = new LinkedBlockingQueue<Exchange>(size); + queue = queueFactory.create(size); } else { if (getQueueSize() > 0) { size = getQueueSize(); - queue = new LinkedBlockingQueue<Exchange>(getQueueSize()); + queue = queueFactory.create(getQueueSize()); } else { - queue = new LinkedBlockingQueue<Exchange>(); + queue = queueFactory.create(); } } log.debug("Created queue {} with size {}", key, size); @@ -127,8 +142,17 @@ public class SedaComponent extends UriEndpointComponent { throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " + maxConcurrentConsumers + " was " + consumers); } - // defer creating queue till endpoint is started, so we pass in null - SedaEndpoint answer = new SedaEndpoint(uri, this, null, consumers); + // Resolve queue reference + BlockingQueue<Exchange> queue=resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class); + SedaEndpoint answer; + // Resolve queue factory when no queue specified + if (queue == null) { + BlockingQueueFactory<Exchange> queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class); + // defer creating queue till endpoint is started, so we pass the queue factory + answer = new SedaEndpoint(uri, this, queueFactory, consumers); + } else { + answer = new SedaEndpoint(uri, this, queue, consumers); + } answer.configureProperties(parameters); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 656736c..d806d60 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -23,7 +23,6 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.camel.Component; import org.apache.camel.Consumer; @@ -79,8 +78,10 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, private int pollTimeout = 1000; @UriParam private boolean purgeWhenStopping; + private BlockingQueueFactory<Exchange> queueFactory; public SedaEndpoint() { + queueFactory = new LinkedBlockingQueueFactory<Exchange>(); } public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { @@ -88,11 +89,21 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { - super(endpointUri, component); + this(endpointUri, component, concurrentConsumers); this.queue = queue; if (queue != null) { this.size = queue.remainingCapacity(); } + queueFactory = new LinkedBlockingQueueFactory<Exchange>(); + } + + public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) { + this(endpointUri, component, concurrentConsumers); + this.queueFactory = queueFactory; + } + + private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) { + super(endpointUri, component); this.concurrentConsumers = concurrentConsumers; } @@ -130,7 +141,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (getComponent() != null) { // use null to indicate default size (= use what the existing queue has been configured with) Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); - SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers()); + SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers(), queueFactory); queue = ref.getQueue(); String key = getComponent().getQueueKey(getEndpointUri()); LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); @@ -149,9 +160,9 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, protected BlockingQueue<Exchange> createQueue() { if (size > 0) { - return new LinkedBlockingQueue<Exchange>(size); + return queueFactory.create(size); } else { - return new LinkedBlockingQueue<Exchange>(); + return queueFactory.create(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java new file mode 100644 index 0000000..033ce80 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2013 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; +import static junit.framework.TestCase.assertEquals; +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import static org.apache.camel.TestSupport.assertIsInstanceOf; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; + +/** + * + */ +public class SedaQueueFactoryTest extends ContextTestSupport { + private final ArrayBlockingQueueFactory<Exchange> arrayQueueFactory=new ArrayBlockingQueueFactory<Exchange>(); + + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry simpleRegistry=new SimpleRegistry(); + simpleRegistry.put("arrayQueueFactory", arrayQueueFactory); + return new DefaultCamelContext(simpleRegistry); + } + + @SuppressWarnings("unchecked") + public void testArrayBlockingQueueFactory() throws Exception { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue?queueFactory=#arrayQueueFactory", SedaEndpoint.class); + + BlockingQueue<Exchange> queue = endpoint.getQueue(); + ArrayBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue); + } + + @SuppressWarnings("unchecked") + public void testArrayBlockingQueueFactoryAndSize() throws Exception { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue50?queueFactory=#arrayQueueFactory&size=50", SedaEndpoint.class); + + BlockingQueue<Exchange> queue = endpoint.getQueue(); + ArrayBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue); + assertEquals("remainingCapacity", 50, blockingQueue.remainingCapacity()); + } + + @SuppressWarnings("unchecked") + public void testDefaultBlockingQueueFactory() throws Exception { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:linkedQueue", SedaEndpoint.class); + + BlockingQueue<Exchange> queue = endpoint.getQueue(); + LinkedBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java index bf30cdc..54ccdd5 100644 --- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java @@ -16,9 +16,16 @@ */ package org.apache.camel.component.seda; +import java.util.concurrent.ArrayBlockingQueue; +import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.hamcrest.CoreMatchers; +import org.junit.matchers.JUnitMatchers; /** * @version @@ -34,6 +41,21 @@ public class SedaQueueTest extends ContextTestSupport { template.sendBody("seda:foo?concurrentConsumers=5", "Goodday World"); template.sendBody("seda:bar", "Bar"); } + public void testQueueRef() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("seda:array?queue=#arrayQueue", "Hello World"); + + SedaEndpoint sedaEndpoint=resolveMandatoryEndpoint("seda:array?queue=#arrayQueue", SedaEndpoint.class); + assertTrue(sedaEndpoint.getQueue() instanceof ArrayBlockingQueue); + } + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry simpleRegistry=new SimpleRegistry(); + simpleRegistry.put("arrayQueue", new ArrayBlockingQueue<Exchange>(10)); + return new DefaultCamelContext(simpleRegistry); + } @Override protected RouteBuilder createRouteBuilder() throws Exception { @@ -43,6 +65,8 @@ public class SedaQueueTest extends ContextTestSupport { from("seda:foo?size=20&concurrentConsumers=2").to("mock:result"); from("seda:bar").to("mock:result"); + + from("seda:array?queue=#arrayQueue").to("mock:result"); } }; }