Author: davsclaus
Date: Mon Mar 26 14:10:47 2012
New Revision: 1305365
URL: http://svn.apache.org/viewvc?rev=1305365&view=rev
Log:
CAMEL-4160: Polished dataset component. Optimized to not need for copy
exchange, which helps reduce memory usage.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java
- copied, changed from r1305240,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=1305365&r1=1305364&r2=1305365&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
Mon Mar 26 14:10:47 2012
@@ -46,17 +46,20 @@ public class DataSetEndpoint extends Moc
private long consumeDelay;
private long preloadSize;
private long initialDelay = 1000;
- private Processor reporter;
@Deprecated
public DataSetEndpoint() {
this.log = LoggerFactory.getLogger(DataSetEndpoint.class);
+ // optimize as we dont need to copy the exchange
+ copyOnExchange = false;
}
public DataSetEndpoint(String endpointUri, Component component, DataSet
dataSet) {
super(endpointUri, component);
this.dataSet = dataSet;
this.log = LoggerFactory.getLogger(endpointUri);
+ // optimize as we dont need to copy the exchange
+ copyOnExchange = false;
}
public static void assertEquals(String description, Object expected,
Object actual, Exchange exchange) {
@@ -160,13 +163,6 @@ public class DataSetEndpoint extends Moc
this.produceDelay = produceDelay;
}
- /**
- * Sets a custom progress reporter
- */
- public void setReporter(Processor reporter) {
- this.reporter = reporter;
- }
-
public long getInitialDelay() {
return initialDelay;
}
@@ -192,10 +188,6 @@ public class DataSetEndpoint extends Moc
assertMessageExpected(index, expected, copy);
- if (reporter != null) {
- reporter.process(copy);
- }
-
if (consumeDelay > 0) {
Thread.sleep(consumeDelay);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=1305365&r1=1305364&r2=1305365&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
Mon Mar 26 14:10:47 2012
@@ -79,6 +79,8 @@ public class MockEndpoint extends Defaul
private static final transient Logger LOG =
LoggerFactory.getLogger(MockEndpoint.class);
// must be volatile so changes is visible between the thread which
performs the assertions
// and the threads which process the exchanges when routing messages in
Camel
+ protected volatile Processor reporter;
+ protected boolean copyOnExchange = true;
private volatile int expectedCount;
private volatile int counter;
private volatile Processor defaultProcessor;
@@ -98,7 +100,6 @@ public class MockEndpoint extends Defaul
private volatile Map<String, Object> actualHeaderValues;
private volatile Map<String, Object> expectedPropertyValues;
private volatile Map<String, Object> actualPropertyValues;
- private volatile Processor reporter;
private volatile int retainFirst;
private volatile int retainLast;
@@ -1040,7 +1041,8 @@ public class MockEndpoint extends Defaul
* You can configure both {@link #setRetainFirst(int)} and {@link
#setRetainLast(int)} methods,
* to limit both the first and last received.
*
- * @param retainFirst to limit and only keep the first n'th received
{@link Exchange}s
+ * @param retainFirst to limit and only keep the first n'th received
{@link Exchange}s, use
+ * <tt>0</tt> to not retain any messages, or
<tt>-1</tt> to retain all.
* @see #setRetainLast(int)
*/
public void setRetainFirst(int retainFirst) {
@@ -1067,7 +1069,8 @@ public class MockEndpoint extends Defaul
* You can configure both {@link #setRetainFirst(int)} and {@link
#setRetainLast(int)} methods,
* to limit both the first and last received.
*
- * @param retainLast to limit and only keep the last n'th received {@link
Exchange}s
+ * @param retainLast to limit and only keep the last n'th received {@link
Exchange}s, use
+ * <tt>0</tt> to not retain any messages, or
<tt>-1</tt> to retain all.
* @see #setRetainFirst(int)
*/
public void setRetainLast(int retainLast) {
@@ -1096,8 +1099,8 @@ public class MockEndpoint extends Defaul
actualHeaderValues = null;
expectedPropertyValues = null;
actualPropertyValues = null;
- retainFirst = 0;
- retainLast = 0;
+ retainFirst = -1;
+ retainLast = -1;
}
protected synchronized void onExchange(Exchange exchange) {
@@ -1105,8 +1108,11 @@ public class MockEndpoint extends Defaul
if (reporter != null) {
reporter.process(exchange);
}
- // copy the exchange so the mock stores the copy and not the
actual exchange
- Exchange copy = ExchangeHelper.createCopy(exchange, true);
+ Exchange copy = exchange;
+ if (copyOnExchange) {
+ // copy the exchange so the mock stores the copy and not the
actual exchange
+ copy = ExchangeHelper.createCopy(exchange, true);
+ }
performAssertions(exchange, copy);
} catch (Throwable e) {
// must catch java.lang.Throwable as AssertionException extends
java.lang.Error
@@ -1197,10 +1203,13 @@ public class MockEndpoint extends Defaul
* @param copy a copy of the received exchange
*/
protected void addReceivedExchange(Exchange copy) {
- if (retainFirst <= 0 && retainLast <= 0) {
+ if (retainFirst == 0 && retainLast == 0) {
+ // do not retain any messages at all
+ } else if (retainFirst < 0 && retainLast < 0) {
// no limitation so keep them all
receivedExchanges.add(copy);
} else {
+ // okay there is some sort of limitations, so figure out what to
retain
if (retainFirst > 0 && counter <= retainFirst) {
// store a copy as its within the retain first limitation
receivedExchanges.add(copy);
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java
(from r1305240,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java&r1=1305240&r2=1305365&rev=1305365&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java
Mon Mar 26 14:10:47 2012
@@ -16,21 +16,24 @@
*/
package org.apache.camel.component.dataset;
+import java.util.concurrent.TimeUnit;
import javax.naming.Context;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* @version
*/
-public class DataSetTest extends ContextTestSupport {
- protected SimpleDataSet dataSet = new SimpleDataSet(20);
+public class BigDataSetTest extends ContextTestSupport {
+ protected SimpleDataSet dataSet = new SimpleDataSet(20000);
public void testDataSet() throws Exception {
// data set will itself set its assertions so we should just
// assert that all mocks is ok
- assertMockEndpointsSatisfied();
+ // TODO: For testing with bigger number of messages that takes a
longer time
+ // MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.MINUTES);
}
@Override
@@ -45,8 +48,9 @@ public class DataSetTest extends Context
return new RouteBuilder() {
public void configure() throws Exception {
// start this first to make sure the "direct:foo" consumer is
ready
- from("direct:foo").to("dataset:foo?minRate=50");
- from("dataset:foo?minRate=50").to("direct:foo");
+ from("direct:foo").to("dataset:foo");
+
+ from("dataset:foo").to("direct:foo");
}
};
}