Added option to disable the CamelDataSetIndex header generation and processing


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b98c5c8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b98c5c8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b98c5c8b

Branch: refs/heads/master
Commit: b98c5c8b275741cc56caa1f201bfbe8400bcfba2
Parents: ad64911
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Fri Mar 11 12:42:40 2016 -0700
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Mar 14 19:23:15 2016 +0100

----------------------------------------------------------------------
 .../component/dataset/DataSetEndpoint.java      |  35 ++-
 ...ponentConfigurationAndDocumentationTest.java |  11 +-
 .../component/dataset/DataSetConsumerTest.java  |  76 ++++-
 .../component/dataset/DataSetProducerTest.java  | 288 ++++++++++++++++++-
 4 files changed, 398 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
 
b/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
index d05f5a9..21fe5de 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
@@ -60,6 +60,8 @@ public class DataSetEndpoint extends MockEndpoint implements 
Service {
     private long preloadSize;
     @UriParam(label = "consumer", defaultValue = "1000")
     private long initialDelay = 1000;
+    @UriParam(label = "consumer,producer", defaultValue = "null")
+    private Boolean disableDataSetIndex;
 
     @Deprecated
     public DataSetEndpoint() {
@@ -120,8 +122,10 @@ public class DataSetEndpoint extends MockEndpoint 
implements Service {
         Exchange exchange = createExchange();
         getDataSet().populateMessage(exchange, messageIndex);
 
-        Message in = exchange.getIn();
-        in.setHeader(Exchange.DATASET_INDEX, messageIndex);
+        if (null == disableDataSetIndex || !disableDataSetIndex) {
+            Message in = exchange.getIn();
+            in.setHeader(Exchange.DATASET_INDEX, messageIndex);
+        }
 
         return exchange;
     }
@@ -206,6 +210,24 @@ public class DataSetEndpoint extends MockEndpoint 
implements Service {
         this.initialDelay = initialDelay;
     }
 
+    /**
+     * Flag to disable setting the CamelDataSetIndex header.
+     * For Consumers:
+     * - true => the header will not be set
+     * - false/null/unset => the header will be set
+     * For Producers:
+     * - true => the header value will not be verified, and will not be set if 
it is not present
+     * = false => the header value must be present and will be verified
+     * = null/unset => the header value will be verified if it is present, and 
will be set if it is not present
+     */
+    public void setDisableDataSetIndex(boolean disableDataSetIndex) {
+        this.disableDataSetIndex = disableDataSetIndex;
+    }
+
+    public boolean getDisableDataSetIndex() {
+        return disableDataSetIndex;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -234,7 +256,14 @@ public class DataSetEndpoint extends MockEndpoint 
implements Service {
     }
 
     protected void assertMessageExpected(long index, Exchange expected, 
Exchange actual) throws Exception {
-        if (null != actual.getIn().getHeader(Exchange.DATASET_INDEX)) {
+        if (null == disableDataSetIndex) {
+            Long dataSetIndexHeaderValue = 
actual.getIn().getHeader(Exchange.DATASET_INDEX, Long.class);
+            if (null != dataSetIndexHeaderValue) {
+                assertEquals("Header: " + Exchange.DATASET_INDEX, index, 
dataSetIndexHeaderValue, actual);
+            } else {
+                actual.getIn().setHeader(Exchange.DATASET_INDEX, index);
+            }
+        } else if (!disableDataSetIndex) {
             long actualCounter = ExchangeHelper.getMandatoryHeader(actual, 
Exchange.DATASET_INDEX, Long.class);
             assertEquals("Header: " + Exchange.DATASET_INDEX, index, 
actualCounter, actual);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java
index f0e0ecf..32a5ede 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java
@@ -33,9 +33,14 @@ public class 
DataSetComponentConfigurationAndDocumentationTest extends ContextTe
     @Test
     public void testComponentConfiguration() throws Exception {
         DataSetComponent comp = context.getComponent("dataset", 
DataSetComponent.class);
-        EndpointConfiguration conf = 
comp.createConfiguration("dataset:foo?minRate=3");
-
-        assertEquals("3", conf.getParameter("minRate"));
+        EndpointConfiguration conf = 
comp.createConfiguration("dataset:foo?minRate=3&produceDelay=33&consumeDelay=333&preloadSize=3333&initialDelay=33333&disableDataSetIndex=true");
+
+        assertEquals("Unexpected endpoint configuration value for minRate", 
"3", conf.getParameter("minRate"));
+        assertEquals("Unexpected endpoint configuration value for 
produceDelay", "33", conf.getParameter("produceDelay"));
+        assertEquals("Unexpected endpoint configuration value for 
consumeDelay", "333", conf.getParameter("consumeDelay"));
+        assertEquals("Unexpected endpoint configuration value for 
preloadSize", "3333", conf.getParameter("preloadSize"));
+        assertEquals("Unexpected endpoint configuration value for 
initialDelay", "33333", conf.getParameter("initialDelay"));
+        assertEquals("Unexpected endpoint configuration value for 
disableDataSetIndex", "true", conf.getParameter("disableDataSetIndex"));
 
         ComponentConfiguration compConf = comp.createComponentConfiguration();
         String json = compConf.createParameterJsonSchema();

http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java
index b255c6e..1e9c65b 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java
@@ -19,16 +19,21 @@ package org.apache.camel.component.dataset;
 import javax.naming.Context;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
 
 /**
  * @version 
  */
 public class DataSetConsumerTest extends ContextTestSupport {
-    protected SimpleDataSet dataSet = new SimpleDataSet(20);
+    protected SimpleDataSet dataSet = new SimpleDataSet(5);
 
     final String dataSetName = "foo";
     final String dataSetUri = "dataset://" + dataSetName;
+    final String dataSetUriWithDisableDataSetIndexSetToFalse = dataSetUri + 
"?disableDataSetIndex=false";
+    final String dataSetUriWithDisableDataSetIndexSetToTrue = dataSetUri + 
"?disableDataSetIndex=true";
     final String resultUri = "mock://result";
 
     @Override
@@ -41,6 +46,7 @@ public class DataSetConsumerTest extends ContextTestSupport {
     /**
      * Ensure the expected message count for a consumer-only endpoint defaults 
to zero
      */
+    @Test
     public void testConsumerOnlyEndpoint() throws Exception {
 
         context.addRoutes(new RouteBuilder() {
@@ -52,9 +58,11 @@ public class DataSetConsumerTest extends ContextTestSupport {
         });
         context.start();
 
-        assertEquals("expectedMessageCount should be -1 for a consumer-only 
endpoint", -1, getMockEndpoint(dataSetUri).getExpectedCount());
+        assertEquals("expectedMessageCount should be unset(i.e. -1) for a 
consumer-only endpoint", -1, getMockEndpoint(dataSetUri).getExpectedCount());
 
-        
getMockEndpoint(resultUri).expectedMessageCount((int)dataSet.getSize());
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) dataSet.getSize());
+        result.assertMessagesAscending(header(Exchange.DATASET_INDEX));
 
         assertMockEndpointsSatisfied();
     }
@@ -62,6 +70,7 @@ public class DataSetConsumerTest extends ContextTestSupport {
     /**
      * Ensure the expected message count for a consumer-producer endpoint 
defaults to the size of the dataset
      */
+    @Test
     public void testConsumerWithProducer() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
@@ -75,7 +84,66 @@ public class DataSetConsumerTest extends ContextTestSupport {
 
         assertEquals("expectedMessageCount should be the same as the DataSet 
size for a consumer-producer endpoint", dataSet.getSize(), 
getMockEndpoint(dataSetUri).getExpectedCount());
 
-        
getMockEndpoint(resultUri).expectedMessageCount((int)dataSet.getSize());
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) dataSet.getSize());
+        
result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // TODO:  Add tests for dataSetIndex URI parameters
+    @Test
+    public void testWithDisableDataSetIndexUriParameterUnset() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(dataSetUri)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) dataSet.getSize());
+        result.allMessages().header(Exchange.DATASET_INDEX).isNotNull();
+        
result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testWithDisableDataSetIndexUriParameterSetToFalse() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(dataSetUriWithDisableDataSetIndexSetToFalse)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) dataSet.getSize());
+        result.allMessages().header(Exchange.DATASET_INDEX).isNotNull();
+        
result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testWithDisableDataSetIndexUriParameterSetToTrue() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(dataSetUriWithDisableDataSetIndexSetToTrue)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) dataSet.getSize());
+        result.allMessages().header(Exchange.DATASET_INDEX).isNull();
 
         assertMockEndpointsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java
index c076e04..f3d930c 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java
@@ -20,6 +20,10 @@ import javax.naming.Context;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
 
 /**
  * @version 
@@ -29,6 +33,9 @@ public class DataSetProducerTest extends ContextTestSupport {
 
     final String dataSetName = "foo";
     final String dataSetUri = "dataset://" + dataSetName;
+    final String dataSetUriWithDisableDataSetIndexSetToFalse = dataSetUri + 
"?disableDataSetIndex=false";
+    final String dataSetUriWithDisableDataSetIndexSetToTrue = dataSetUri + 
"?disableDataSetIndex=true";
+    final String sourceUri = "direct://source";
     final String resultUri = "mock://result";
 
     @Override
@@ -38,22 +45,299 @@ public class DataSetProducerTest extends 
ContextTestSupport {
         return context;
     }
 
+    @Test
     public void testSendingMessagesExplicitlyToDataSetEndpoint() throws 
Exception {
         long size = dataSet.getSize();
         for (long i = 0; i < size; i++) {
-            template.sendBodyAndHeader(dataSetUri, "<hello>world!</hello>", 
Exchange.DATASET_INDEX, i);
+            template.sendBodyAndHeader(dataSetUri, dataSet.getDefaultBody(), 
Exchange.DATASET_INDEX, i);
         }
 
         assertMockEndpointsSatisfied();
     }
 
+    @Test
     public void 
testSendingMessagesExplicitlyToDataSetEndpointWithoutDataSetIndex() throws 
Exception {
         long size = dataSet.getSize();
         for (long i = 0; i < size; i++) {
-            template.sendBody(dataSetUri, "<hello>world!</hello>");
+            template.sendBody(dataSetUri, dataSet.getDefaultBody());
         }
 
         assertMockEndpointsSatisfied();
     }
 
+    /**
+     * Verfiy that the CamelDataSetIndex header is optional when the 
disableDataSetIndex parameter is unset
+     */
+    @Test
+    public void 
testNotSettingDataSetIndexHeaderWhenDisableDataSetIndexUriParameterIsUnset() 
throws Exception {
+        long size = dataSet.getSize();
+        for (long i = 0; i < size; i++) {
+            if (0 == (size % 2)) {
+                template.sendBodyAndHeader(dataSetUri, 
dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i);
+            } else {
+                template.sendBody(dataSetUri, dataSet.getDefaultBody());
+            }
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * Verfiy tha the CamelDataSetIndex header is optional when the 
disableDataSetIndex parameter is true
+     */
+    @Test
+    public void 
testNotSettingDataSetIndexHeaderWhenDisableDataSetIndexUriParameterSetToTrue() 
throws Exception {
+        long size = dataSet.getSize();
+        for (long i = 0; i < size; i++) {
+            if (0 == (size % 2)) {
+                
template.sendBodyAndHeader(dataSetUriWithDisableDataSetIndexSetToTrue, 
dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i);
+            } else {
+                template.sendBody(dataSetUriWithDisableDataSetIndexSetToTrue, 
dataSet.getDefaultBody());
+            }
+        }
+        for (long i = 0; i < size; i++) {
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * Verify tha the CamelDataSetIndex header is required when the 
disableDataSetIndex parameter is false
+     */
+    @Test
+    public void 
testNotSettingDataSetIndexHeaderWhenDisableDataSetIndexUriParameterSetToFalse() 
throws Exception {
+        long size = dataSet.getSize();
+        for (long i = 0; i < size; i++) {
+            template.sendBody(dataSetUriWithDisableDataSetIndexSetToFalse, 
dataSet.getDefaultBody());
+        }
+
+        try {
+            
getMockEndpoint(dataSetUriWithDisableDataSetIndexSetToFalse).assertIsSatisfied();
+        } catch (AssertionError assertionError) {
+            // Check as much of the string as possible - but the ExchangeID at 
the end will be unique
+            String expectedErrorString = 
dataSetUriWithDisableDataSetIndexSetToFalse
+                    + " Failed due to caught exception: "
+                    + NoSuchHeaderException.class.getName()
+                    + ": No '" + Exchange.DATASET_INDEX
+                    + "' header available of type: java.lang.Long. Exchange";
+            String actualErrorString = assertionError.getMessage();
+            if (actualErrorString.startsWith(expectedErrorString)) {
+                // This is what we expect
+                return;
+            } else {
+                throw assertionError;
+            }
+        }
+
+        fail("AssertionError should have been generated");
+    }
+
+    @Test
+    public void 
testSendingMessagesExplicitlyToDataSetEndpointWithoutDataSetIndexAndDisableDataSetIndexUriParameterSetToTrue()
 throws Exception {
+        long size = dataSet.getSize();
+        for (long i = 0; i < size; i++) {
+            template.sendBody(dataSetUriWithDisableDataSetIndexSetToTrue, 
dataSet.getDefaultBody());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDisableDataSetIndexUriParameterUnset() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sourceUri)
+                        .to(dataSetUri)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        long size = dataSet.getSize();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) size);
+        result.allMessages().header(Exchange.DATASET_INDEX).isNotNull();
+        
result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class));
+
+        for (long i = 0; i < size; i++) {
+            template.sendBody(sourceUri, dataSet.getDefaultBody());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        
result.assertMessagesAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class));
+    }
+
+    @Test
+    public void testDisableDataSetIndexUriParameterSetToTrue() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sourceUri)
+                        .to(dataSetUriWithDisableDataSetIndexSetToTrue)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        long size = dataSet.getSize();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) size);
+        result.allMessages().header(Exchange.DATASET_INDEX).isNull();
+
+        for (long i = 0; i < size; i++) {
+            template.sendBody(sourceUri, dataSet.getDefaultBody());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDisableDataSetIndexUriParameterSetToFalse() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sourceUri)
+                        .to(dataSetUriWithDisableDataSetIndexSetToFalse)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        long size = dataSet.getSize();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) size);
+        
result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class));
+        result.allMessages().header(Exchange.DATASET_INDEX).isNotNull();
+
+        for (long i = 0; i < size; i++) {
+            template.sendBodyAndHeader(sourceUri, dataSet.getDefaultBody(), 
Exchange.DATASET_INDEX, i);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    @Test
+    public void 
testInvalidDataSetIndexValueWithDisableDataSetIndexUriParameterUnset() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sourceUri)
+                        .to(dataSetUri)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        long size = dataSet.getSize();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) size);
+        result.allMessages().header(Exchange.DATASET_INDEX).isNotNull();
+
+        for (long i = 0; i < size; i++) {
+            if (i == (size / 2)) {
+                template.sendBodyAndHeader(sourceUri, 
dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i + 10);
+            } else {
+                template.sendBody(sourceUri, dataSet.getDefaultBody());
+            }
+        }
+
+        try {
+            assertMockEndpointsSatisfied();
+        } catch (AssertionError assertionError) {
+            // Check as much of the string as possible - but the ExchangeID at 
the end will be unique
+            String expectedErrorString = dataSetUri + " Failed due to caught 
exception: "
+                    + AssertionError.class.getName()
+                    + ": Header: " + Exchange.DATASET_INDEX + " does not 
match. Expected: "
+                    + size / 2 + " but was: " + (size / 2 + 10) + " on 
Exchange";
+            String actualErrorString = assertionError.getMessage();
+            if (actualErrorString.startsWith(expectedErrorString)) {
+                // This is what we expect
+                return;
+            } else {
+                throw assertionError;
+            }
+        }
+
+        fail("AssertionError should have been generated");
+    }
+
+    @Test
+    public void 
testInvalidDataSetIndexValueWithDisableDataSetIndexUriParameterSetToTrue() 
throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sourceUri)
+                        .to(dataSetUriWithDisableDataSetIndexSetToTrue)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        long size = dataSet.getSize();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) size);
+
+        for (long i = 0; i < size; i++) {
+            if (i == (size / 2)) {
+                template.sendBodyAndHeader(sourceUri, 
dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i + 10);
+            } else {
+                template.sendBody(sourceUri, dataSet.getDefaultBody());
+            }
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void 
testInvalidDataSetIndexValueWithDisableDataSetIndexUriParameterSetToFalse() 
throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sourceUri)
+                        .to(dataSetUriWithDisableDataSetIndexSetToFalse)
+                        .to(resultUri);
+            }
+        });
+        context.start();
+
+        long size = dataSet.getSize();
+
+        MockEndpoint result = getMockEndpoint(resultUri);
+        result.expectedMessageCount((int) size);
+        result.allMessages().header(Exchange.DATASET_INDEX).isNotNull();
+
+        for (long i = 0; i < size; i++) {
+            if (i == (size / 2)) {
+                template.sendBodyAndHeader(sourceUri, 
dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i + 10);
+            } else {
+                template.sendBodyAndHeader(sourceUri, 
dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i);
+            }
+        }
+
+        try {
+            assertMockEndpointsSatisfied();
+        } catch (AssertionError assertionError) {
+            // Check as much of the string as possible - but the ExchangeID at 
the end will be unique
+            String expectedErrorString = 
dataSetUriWithDisableDataSetIndexSetToFalse + " Failed due to caught exception: 
"
+                    + AssertionError.class.getName() + ": Header: " + 
Exchange.DATASET_INDEX
+                    + " does not match. Expected: " + size / 2 + " but was: " 
+ (size / 2 + 10) + " on Exchange";
+            String actualErrorString = assertionError.getMessage();
+            if (actualErrorString.startsWith(expectedErrorString)) {
+                // This is what we expect
+                return;
+            } else {
+                throw assertionError;
+            }
+        }
+
+        fail("AssertionError should have been generated");
+    }
 }
\ No newline at end of file

Reply via email to