Author: davsclaus
Date: Mon Mar 26 14:10:47 2012
New Revision: 1305365

URL: http://svn.apache.org/viewvc?rev=1305365&view=rev
Log:
CAMEL-4160: Polished dataset component. Optimized to not need for copy 
exchange, which helps reduce memory usage.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java
      - copied, changed from r1305240, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=1305365&r1=1305364&r2=1305365&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
 Mon Mar 26 14:10:47 2012
@@ -46,17 +46,20 @@ public class DataSetEndpoint extends Moc
     private long consumeDelay;
     private long preloadSize;
     private long initialDelay = 1000;
-    private Processor reporter;
 
     @Deprecated
     public DataSetEndpoint() {
         this.log = LoggerFactory.getLogger(DataSetEndpoint.class);
+        // optimize as we dont need to copy the exchange
+        copyOnExchange = false;
     }
 
     public DataSetEndpoint(String endpointUri, Component component, DataSet 
dataSet) {
         super(endpointUri, component);
         this.dataSet = dataSet;
         this.log = LoggerFactory.getLogger(endpointUri);
+        // optimize as we dont need to copy the exchange
+        copyOnExchange = false;
     }
 
     public static void assertEquals(String description, Object expected, 
Object actual, Exchange exchange) {
@@ -160,13 +163,6 @@ public class DataSetEndpoint extends Moc
         this.produceDelay = produceDelay;
     }
 
-    /**
-     * Sets a custom progress reporter
-     */
-    public void setReporter(Processor reporter) {
-        this.reporter = reporter;
-    }
-
     public long getInitialDelay() {
         return initialDelay;
     }
@@ -192,10 +188,6 @@ public class DataSetEndpoint extends Moc
 
         assertMessageExpected(index, expected, copy);
 
-        if (reporter != null) {
-            reporter.process(copy);
-        }
-
         if (consumeDelay > 0) {
             Thread.sleep(consumeDelay);
         }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=1305365&r1=1305364&r2=1305365&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 Mon Mar 26 14:10:47 2012
@@ -79,6 +79,8 @@ public class MockEndpoint extends Defaul
     private static final transient Logger LOG = 
LoggerFactory.getLogger(MockEndpoint.class);
     // must be volatile so changes is visible between the thread which 
performs the assertions
     // and the threads which process the exchanges when routing messages in 
Camel
+    protected volatile Processor reporter;
+    protected boolean copyOnExchange = true;
     private volatile int expectedCount;
     private volatile int counter;
     private volatile Processor defaultProcessor;
@@ -98,7 +100,6 @@ public class MockEndpoint extends Defaul
     private volatile Map<String, Object> actualHeaderValues;
     private volatile Map<String, Object> expectedPropertyValues;
     private volatile Map<String, Object> actualPropertyValues;
-    private volatile Processor reporter;
     private volatile int retainFirst;
     private volatile int retainLast;
 
@@ -1040,7 +1041,8 @@ public class MockEndpoint extends Defaul
      * You can configure both {@link #setRetainFirst(int)} and {@link 
#setRetainLast(int)} methods,
      * to limit both the first and last received.
      * 
-     * @param retainFirst  to limit and only keep the first n'th received 
{@link Exchange}s
+     * @param retainFirst  to limit and only keep the first n'th received 
{@link Exchange}s, use
+     *                     <tt>0</tt> to not retain any messages, or 
<tt>-1</tt> to retain all.
      * @see #setRetainLast(int)
      */
     public void setRetainFirst(int retainFirst) {
@@ -1067,7 +1069,8 @@ public class MockEndpoint extends Defaul
      * You can configure both {@link #setRetainFirst(int)} and {@link 
#setRetainLast(int)} methods,
      * to limit both the first and last received.
      *
-     * @param retainLast  to limit and only keep the last n'th received {@link 
Exchange}s
+     * @param retainLast  to limit and only keep the last n'th received {@link 
Exchange}s, use
+     *                     <tt>0</tt> to not retain any messages, or 
<tt>-1</tt> to retain all.
      * @see #setRetainFirst(int)
      */
     public void setRetainLast(int retainLast) {
@@ -1096,8 +1099,8 @@ public class MockEndpoint extends Defaul
         actualHeaderValues = null;
         expectedPropertyValues = null;
         actualPropertyValues = null;
-        retainFirst = 0;
-        retainLast = 0;
+        retainFirst = -1;
+        retainLast = -1;
     }
 
     protected synchronized void onExchange(Exchange exchange) {
@@ -1105,8 +1108,11 @@ public class MockEndpoint extends Defaul
             if (reporter != null) {
                 reporter.process(exchange);
             }
-            // copy the exchange so the mock stores the copy and not the 
actual exchange
-            Exchange copy = ExchangeHelper.createCopy(exchange, true);
+            Exchange copy = exchange;
+            if (copyOnExchange) {
+                // copy the exchange so the mock stores the copy and not the 
actual exchange
+                copy = ExchangeHelper.createCopy(exchange, true);
+            }
             performAssertions(exchange, copy);
         } catch (Throwable e) {
             // must catch java.lang.Throwable as AssertionException extends 
java.lang.Error
@@ -1197,10 +1203,13 @@ public class MockEndpoint extends Defaul
      * @param copy  a copy of the received exchange
      */
     protected void addReceivedExchange(Exchange copy) {
-        if (retainFirst <= 0 && retainLast <= 0) {
+        if (retainFirst == 0 && retainLast == 0) {
+            // do not retain any messages at all
+        } else if (retainFirst < 0 && retainLast < 0) {
             // no limitation so keep them all
             receivedExchanges.add(copy);
         } else {
+            // okay there is some sort of limitations, so figure out what to 
retain
             if (retainFirst > 0 && counter <= retainFirst) {
                 // store a copy as its within the retain first limitation
                 receivedExchanges.add(copy);

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java
 (from r1305240, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java&r1=1305240&r2=1305365&rev=1305365&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/BigDataSetTest.java
 Mon Mar 26 14:10:47 2012
@@ -16,21 +16,24 @@
  */
 package org.apache.camel.component.dataset;
 
+import java.util.concurrent.TimeUnit;
 import javax.naming.Context;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version 
  */
-public class DataSetTest extends ContextTestSupport {
-    protected SimpleDataSet dataSet = new SimpleDataSet(20);
+public class BigDataSetTest extends ContextTestSupport {
+    protected SimpleDataSet dataSet = new SimpleDataSet(20000);
 
     public void testDataSet() throws Exception {
         // data set will itself set its assertions so we should just
         // assert that all mocks is ok
-        assertMockEndpointsSatisfied();
+        // TODO: For testing with bigger number of messages that takes a 
longer time
+        // MockEndpoint.assertIsSatisfied(context, 5, TimeUnit.MINUTES);
     }
 
     @Override
@@ -45,8 +48,9 @@ public class DataSetTest extends Context
         return new RouteBuilder() {
             public void configure() throws Exception {
                 // start this first to make sure the "direct:foo" consumer is 
ready
-                from("direct:foo").to("dataset:foo?minRate=50");
-                from("dataset:foo?minRate=50").to("direct:foo");
+                from("direct:foo").to("dataset:foo");
+
+                from("dataset:foo").to("direct:foo");
             }
         };
     }


Reply via email to