Author: akarpe
Date: Thu Oct 14 13:59:55 2010
New Revision: 1022538
URL: http://svn.apache.org/viewvc?rev=1022538&view=rev
Log:
CAMEL-3197 - Added a new Load Balancing algorithm that provides weighted
round-robin and weighted random support
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
(with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
(with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
(with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
(with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java?rev=1022538&r1=1022537&r2=1022538&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
Thu Oct 14 13:59:55 2010
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -35,12 +36,16 @@ import org.apache.camel.model.loadbalanc
import org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition;
import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition;
import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition;
+import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition;
import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
import org.apache.camel.processor.loadbalancer.StickyLoadBalancer;
import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRandomLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRoundRobinLoadBalancer;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.CollectionStringBuffer;
@@ -58,7 +63,8 @@ public class LoadBalanceDefinition exten
@XmlElement(required = false, name = "random", type =
RandomLoadBalancerDefinition.class),
@XmlElement(required = false, name = "roundRobin", type =
RoundRobinLoadBalancerDefinition.class),
@XmlElement(required = false, name = "sticky", type =
StickyLoadBalancerDefinition.class),
- @XmlElement(required = false, name = "topic", type =
TopicLoadBalancerDefinition.class)}
+ @XmlElement(required = false, name = "topic", type =
TopicLoadBalancerDefinition.class),
+ @XmlElement(required = false, name = "weighted", type =
WeightedLoadBalancerDefinition.class)}
)
private LoadBalancerDefinition loadBalancerType;
@@ -182,6 +188,24 @@ public class LoadBalanceDefinition exten
}
/**
+ * Uses weighted load balancer
+ *
+ * @param roundRobin used to set the processor selection
algorithm.
+ * @param distributionRatioList ArrayList<Long> of weighted ratios for
distribution of messages.
+ * @return the builder
+ */
+ public LoadBalanceDefinition weighted(boolean roundRobin,
ArrayList<Integer> distributionRatioList) {
+ WeightedLoadBalancer weighted;
+ if (!roundRobin) {
+ weighted = new WeightedRandomLoadBalancer(distributionRatioList);
+ } else {
+ weighted = new
WeightedRoundRobinLoadBalancer(distributionRatioList);
+ }
+ loadBalancerType = new LoadBalancerDefinition(weighted);
+ return this;
+ }
+
+ /**
* Uses round robin load balancer
*
* @return the builder
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.loadbalancer;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.model.LoadBalancerDefinition;
+import org.apache.camel.processor.loadbalancer.LoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRandomLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRoundRobinLoadBalancer;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Represents an XML <sticky/> element
+ */
+...@xmlrootelement(name = "weighted")
+...@xmlaccessortype(XmlAccessType.FIELD)
+public class WeightedLoadBalancerDefinition extends LoadBalancerDefinition {
+
+ @XmlElement(name = "roundRobin", required = true)
+ private boolean roundRobin;
+
+ @XmlElement(name = "distributionRatios", required = true)
+ private ArrayList<Integer> distributionRatioList;
+
+ @Override
+ protected LoadBalancer createLoadBalancer(RouteContext routeContext) {
+ WeightedLoadBalancer loadBalancer = null;
+
+ try {
+ if (!roundRobin) {
+ loadBalancer = new
WeightedRandomLoadBalancer(distributionRatioList);
+ } else {
+ loadBalancer = new
WeightedRoundRobinLoadBalancer(distributionRatioList);
+ }
+ } catch (Exception e) {
+
+ }
+ return loadBalancer;
+ }
+
+ public boolean isRoundRobin() {
+ return roundRobin;
+ }
+
+ public void setRoundRobin(boolean roundRobin) {
+ this.roundRobin = roundRobin;
+ }
+
+ public ArrayList<Integer> getDistributionRatioList() {
+ return distributionRatioList;
+ }
+
+ public void setDistributionRatioList(ArrayList<Integer>
distributionRatioList) {
+ this.distributionRatioList = distributionRatioList;
+ }
+
+ @Override
+ public String toString() {
+ if (!roundRobin) {
+ return "WeightedRandomLoadBalancer[" + distributionRatioList + "]";
+ } else {
+ return "WeightedRoundRobinLoadBalancer[" + distributionRatioList +
"]";
+ }
+ }
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+public class DistributionRatio {
+ private int processorPosition;
+ private int distributionWeight;
+ private int runtimeWeight;
+
+ public DistributionRatio(int processorPosition, int distributionWeight) {
+ super();
+ this.processorPosition = processorPosition;
+ this.distributionWeight = distributionWeight;
+ this.runtimeWeight = distributionWeight;
+ }
+
+ public DistributionRatio(int processorPosition, int distributionWeight,
int runtimeWeight) {
+ super();
+ this.processorPosition = processorPosition;
+ this.distributionWeight = distributionWeight;
+ this.runtimeWeight = runtimeWeight;
+ }
+
+ public int getProcessorPosition() {
+ return processorPosition;
+ }
+
+ public void setProcessorPosition(int processorPosition) {
+ this.processorPosition = processorPosition;
+ }
+
+ public int getDistributionWeight() {
+ return distributionWeight;
+ }
+
+ public void setDistributionWeight(int distributionWeight) {
+ this.distributionWeight = distributionWeight;
+ }
+
+ public int getRuntimeWeight() {
+ return runtimeWeight;
+ }
+
+ public void setRuntimeWeight(int runtimeWeight) {
+ this.runtimeWeight = runtimeWeight;
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class WeightedLoadBalancer extends QueueLoadBalancer {
+ private static final transient Log LOG =
LogFactory.getLog(WeightedLoadBalancer.class);
+ private ArrayList<Integer> distributionRatioList = new
ArrayList<Integer>();
+ private ArrayList<DistributionRatio> runtimeRatios = new
ArrayList<DistributionRatio>();
+
+ public WeightedLoadBalancer(ArrayList<Integer> distributionRatios) {
+ deepCloneDistributionRatios(distributionRatios);
+ loadRuntimeRatios(distributionRatios);
+ }
+
+ protected void deepCloneDistributionRatios(ArrayList<Integer>
distributionRatios) {
+ for (Integer value : distributionRatios) {
+ this.distributionRatioList.add(value);
+ }
+ }
+
+ protected void loadRuntimeRatios(ArrayList<Integer> distributionRatios) {
+ int position = 0;
+
+ for (Integer value : distributionRatios) {
+ runtimeRatios.add(new DistributionRatio(position++,
value.intValue()));
+ }
+ }
+
+ protected void normalizeDistributionListAgainstProcessors(List<Processor>
processors) {
+ if (processors.size() > getDistributionRatioList().size()) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Listed Load Balance Processors do not match
distributionRatio. Best Effort distribution will be attempted");
+ LOG.warn("Number of Processors: " + processors.size() + ".
Number of DistibutionRatioList elements: " + getDistributionRatioList().size());
+ }
+ } else if (processors.size() < getDistributionRatioList().size()) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Listed Load Balance Processors do not match
distributionRatio. Best Effort distribution will be attempted");
+ LOG.warn("Number of Processors: " + processors.size() + ".
Number of DistibutionRatioList elements: " + getDistributionRatioList().size());
+ }
+ for (int i = processors.size(); i <
getDistributionRatioList().size(); i++) {
+ getDistributionRatioList().set(i, 0);
+ getRuntimeRatios().remove(i);
+ }
+ }
+ }
+
+ protected boolean isRuntimeRatiosZeroed() {
+ boolean cleared = true;
+
+ for (DistributionRatio runtimeRatio : runtimeRatios) {
+ if (runtimeRatio.getRuntimeWeight() > 0) {
+ cleared = false;
+ }
+ }
+ return cleared;
+ }
+
+ protected void resetRuntimeRatios() {
+ for (DistributionRatio runtimeRatio : runtimeRatios) {
+
runtimeRatio.setRuntimeWeight(runtimeRatio.getDistributionWeight());
+ }
+ }
+
+ public ArrayList<Integer> getDistributionRatioList() {
+ return distributionRatioList;
+ }
+
+ public void setDistributionRatioList(ArrayList<Integer>
distributionRatioList) {
+ this.distributionRatioList = distributionRatioList;
+ }
+
+ public ArrayList<DistributionRatio> getRuntimeRatios() {
+ return runtimeRatios;
+ }
+
+ public void setRuntimeRatios(ArrayList<DistributionRatio> runtimeRatios) {
+ this.runtimeRatios = runtimeRatios;
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class WeightedRandomLoadBalancer extends WeightedLoadBalancer {
+ private int randomCounter;
+
+ public WeightedRandomLoadBalancer(ArrayList<Integer> distributionRatios) {
+ super(distributionRatios);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.camel.processor.loadbalancer.QueueLoadBalancer#chooseProcessor(java.util.List,
org.apache.camel.Exchange)
+ */
+ @Override
+ protected Processor chooseProcessor(List<Processor> processors,
+ Exchange exchange) {
+
+ normalizeDistributionListAgainstProcessors(processors);
+
+ boolean found = false;
+
+ while (!found) {
+ if (getRuntimeRatios().isEmpty()) {
+ loadRuntimeRatios(getDistributionRatioList());
+ }
+
+ randomCounter = 0;
+ if (getRuntimeRatios().size() > 0) {
+ randomCounter = new
Random().nextInt(getRuntimeRatios().size());
+ }
+
+ if (getRuntimeRatios().get(randomCounter).getRuntimeWeight() > 0) {
+
getRuntimeRatios().get(randomCounter).setRuntimeWeight((getRuntimeRatios().get(randomCounter).getRuntimeWeight())
- 1);
+ found = true;
+ break;
+ } else {
+ getRuntimeRatios().remove(randomCounter);
+ }
+ }
+
+ return
processors.get(getRuntimeRatios().get(randomCounter).getProcessorPosition());
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class WeightedRoundRobinLoadBalancer extends WeightedLoadBalancer {
+ private int counter;
+
+ public WeightedRoundRobinLoadBalancer(ArrayList<Integer>
distributionRatios) {
+ super(distributionRatios);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.camel.processor.loadbalancer.QueueLoadBalancer#chooseProcessor(java.util.List,
org.apache.camel.Exchange)
+ */
+ @Override
+ protected Processor chooseProcessor(List<Processor> processors,
+ Exchange exchange) {
+
+ normalizeDistributionListAgainstProcessors(processors);
+
+ if (isRuntimeRatiosZeroed()) {
+ resetRuntimeRatios();
+ counter = 0;
+ }
+
+ boolean found = false;
+
+ while (!found) {
+ if (counter >= getRuntimeRatios().size()) {
+ counter = 0;
+ }
+
+ if (getRuntimeRatios().get(counter).getRuntimeWeight() > 0) {
+
getRuntimeRatios().get(counter).setRuntimeWeight((getRuntimeRatios().get(counter).getRuntimeWeight())
- 1);
+ found = true;
+ break;
+ } else {
+ counter++;
+ }
+ }
+
+ return processors.get(counter++);
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class WeightedRandomLoadBalanceTest extends ContextTestSupport {
+ protected MockEndpoint x;
+ protected MockEndpoint y;
+ protected MockEndpoint z;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ x = getMockEndpoint("mock:x");
+ y = getMockEndpoint("mock:y");
+ z = getMockEndpoint("mock:z");
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.ContextTestSupport#isUseRouteBuilder()
+ */
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testRandom() throws Exception {
+
+ x.expectedMessageCount(4);
+ y.expectedMessageCount(2);
+ z.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ ArrayList<Integer> distributionRatio = new
ArrayList<Integer>();
+ distributionRatio.add(4);
+ distributionRatio.add(2);
+ distributionRatio.add(1);
+
+ // START SNIPPET: example
+ from("direct:start").loadBalance().
+ weighted(false, distributionRatio).to("mock:x", "mock:y",
"mock:z");
+ // END SNIPPET: example
+ }
+ });
+ context.start();
+
+ sendMessages(1, 2, 3, 4, 5, 6, 7);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testRandom2() throws Exception {
+
+ x.expectedMessageCount(2);
+ y.expectedMessageCount(1);
+ z.expectedMessageCount(3);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ ArrayList<Integer> distributionRatio = new
ArrayList<Integer>();
+ distributionRatio.add(2);
+ distributionRatio.add(1);
+ distributionRatio.add(3);
+
+ // START SNIPPET: example
+ from("direct:start").loadBalance().
+ weighted(false, distributionRatio).to("mock:x", "mock:y",
"mock:z");
+ // END SNIPPET: example
+ }
+ });
+ context.start();
+
+ sendMessages(1, 2, 3, 4, 5, 6);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testRandomBulk() throws Exception {
+
+ x.expectedMessageCount(10);
+ y.expectedMessageCount(15);
+ z.expectedMessageCount(25);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ ArrayList<Integer> distributionRatio = new
ArrayList<Integer>();
+ distributionRatio.add(2);
+ distributionRatio.add(3);
+ distributionRatio.add(5);
+
+ // START SNIPPET: example
+ from("direct:start").loadBalance().
+ weighted(false, distributionRatio).to("mock:x", "mock:y",
"mock:z");
+ // END SNIPPET: example
+ }
+ });
+ context.start();
+
+ sendBulkMessages(50);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBulkMessages(int number) {
+ for (int i = 0; i < number; i++) {
+ template.sendBodyAndHeader("direct:start", createTestMessage(i),
"counter", i);
+ }
+ }
+
+ protected void sendMessages(int... counters) {
+ for (int counter : counters) {
+ template.sendBodyAndHeader("direct:start",
createTestMessage(counter), "counter", counter);
+ }
+ }
+
+ private String createTestMessage(int counter) {
+ return "<message>" + counter + "</message>";
+ }
+
+ protected Object[] listOfMessages(int... counters) {
+ List<String> list = new ArrayList<String>(counters.length);
+ for (int counter : counters) {
+ list.add(createTestMessage(counter));
+ }
+ return list.toArray();
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java?rev=1022538&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
+
+public class WeightedRoundRobinLoadBalanceTest extends ContextTestSupport {
+ protected MockEndpoint x;
+ protected MockEndpoint y;
+ protected MockEndpoint z;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ x = getMockEndpoint("mock:x");
+ y = getMockEndpoint("mock:y");
+ z = getMockEndpoint("mock:z");
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.ContextTestSupport#isUseRouteBuilder()
+ */
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testRoundRobin() throws Exception {
+
+ x.expectedMessageCount(5);
+ y.expectedMessageCount(2);
+ z.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ ArrayList<Integer> distributionRatio = new
ArrayList<Integer>();
+ distributionRatio.add(4);
+ distributionRatio.add(2);
+ distributionRatio.add(1);
+
+ // START SNIPPET: example
+ from("direct:start").loadBalance().
+ weighted(true, distributionRatio).to("mock:x", "mock:y",
"mock:z");
+ // END SNIPPET: example
+ }
+ });
+ context.start();
+
+ sendMessages(1, 2, 3, 4, 5, 6, 7, 8);
+
+ assertMockEndpointsSatisfied();
+ x.expectedBodiesReceived(1, 4, 6, 7, 8);
+ y.expectedBodiesReceived(2, 5);
+ z.expectedBodiesReceived(3);
+ }
+
+ public void testRoundRobin2() throws Exception {
+
+ x.expectedMessageCount(3);
+ y.expectedMessageCount(1);
+ z.expectedMessageCount(3);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ ArrayList<Integer> distributionRatio = new
ArrayList<Integer>();
+ distributionRatio.add(2);
+ distributionRatio.add(1);
+ distributionRatio.add(3);
+
+ // START SNIPPET: example
+ from("direct:start").loadBalance().
+ weighted(true, distributionRatio).to("mock:x", "mock:y",
"mock:z");
+ // END SNIPPET: example
+ }
+ });
+ context.start();
+
+ sendMessages(1, 2, 3, 4, 5, 6, 7);
+
+ assertMockEndpointsSatisfied();
+ x.expectedBodiesReceived(1, 4, 7);
+ y.expectedBodiesReceived(2);
+ z.expectedBodiesReceived(3, 5, 6);
+ }
+
+ public void testRoundRobinBulk() throws Exception {
+
+ x.expectedMessageCount(10);
+ y.expectedMessageCount(15);
+ z.expectedMessageCount(25);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ ArrayList<Integer> distributionRatio = new
ArrayList<Integer>();
+ distributionRatio.add(2);
+ distributionRatio.add(3);
+ distributionRatio.add(5);
+
+ // START SNIPPET: example
+ from("direct:start").loadBalance().
+ weighted(true, distributionRatio).to("mock:x", "mock:y",
"mock:z");
+ // END SNIPPET: example
+ }
+ });
+ context.start();
+
+ sendBulkMessages(50);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBulkMessages(int number) {
+ for (int i = 0; i < number; i++) {
+ template.sendBodyAndHeader("direct:start", createTestMessage(i),
"counter", i);
+ }
+ }
+
+ protected void sendMessages(int... counters) {
+ for (int counter : counters) {
+ template.sendBodyAndHeader("direct:start",
createTestMessage(counter), "counter", counter);
+ }
+ }
+
+ private String createTestMessage(int counter) {
+ return "<message>" + counter + "</message>";
+ }
+
+ protected Object[] listOfMessages(int... counters) {
+ List<String> list = new ArrayList<String>(counters.length);
+ for (int counter : counters) {
+ list.add(createTestMessage(counter));
+ }
+ return list.toArray();
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native