Added option to disable the CamelDataSetIndex header generation and processing
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b98c5c8b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b98c5c8b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b98c5c8b Branch: refs/heads/master Commit: b98c5c8b275741cc56caa1f201bfbe8400bcfba2 Parents: ad64911 Author: Quinn Stevenson <qu...@pronoia-solutions.com> Authored: Fri Mar 11 12:42:40 2016 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Mar 14 19:23:15 2016 +0100 ---------------------------------------------------------------------- .../component/dataset/DataSetEndpoint.java | 35 ++- ...ponentConfigurationAndDocumentationTest.java | 11 +- .../component/dataset/DataSetConsumerTest.java | 76 ++++- .../component/dataset/DataSetProducerTest.java | 288 ++++++++++++++++++- 4 files changed, 398 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java index d05f5a9..21fe5de 100644 --- a/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java @@ -60,6 +60,8 @@ public class DataSetEndpoint extends MockEndpoint implements Service { private long preloadSize; @UriParam(label = "consumer", defaultValue = "1000") private long initialDelay = 1000; + @UriParam(label = "consumer,producer", defaultValue = "null") + private Boolean disableDataSetIndex; @Deprecated public DataSetEndpoint() { @@ -120,8 +122,10 @@ public class DataSetEndpoint extends MockEndpoint implements Service { Exchange exchange = createExchange(); getDataSet().populateMessage(exchange, messageIndex); - Message in = exchange.getIn(); - in.setHeader(Exchange.DATASET_INDEX, messageIndex); + if (null == disableDataSetIndex || !disableDataSetIndex) { + Message in = exchange.getIn(); + in.setHeader(Exchange.DATASET_INDEX, messageIndex); + } return exchange; } @@ -206,6 +210,24 @@ public class DataSetEndpoint extends MockEndpoint implements Service { this.initialDelay = initialDelay; } + /** + * Flag to disable setting the CamelDataSetIndex header. + * For Consumers: + * - true => the header will not be set + * - false/null/unset => the header will be set + * For Producers: + * - true => the header value will not be verified, and will not be set if it is not present + * = false => the header value must be present and will be verified + * = null/unset => the header value will be verified if it is present, and will be set if it is not present + */ + public void setDisableDataSetIndex(boolean disableDataSetIndex) { + this.disableDataSetIndex = disableDataSetIndex; + } + + public boolean getDisableDataSetIndex() { + return disableDataSetIndex; + } + // Implementation methods //------------------------------------------------------------------------- @@ -234,7 +256,14 @@ public class DataSetEndpoint extends MockEndpoint implements Service { } protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception { - if (null != actual.getIn().getHeader(Exchange.DATASET_INDEX)) { + if (null == disableDataSetIndex) { + Long dataSetIndexHeaderValue = actual.getIn().getHeader(Exchange.DATASET_INDEX, Long.class); + if (null != dataSetIndexHeaderValue) { + assertEquals("Header: " + Exchange.DATASET_INDEX, index, dataSetIndexHeaderValue, actual); + } else { + actual.getIn().setHeader(Exchange.DATASET_INDEX, index); + } + } else if (!disableDataSetIndex) { long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class); assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual); } http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java index f0e0ecf..32a5ede 100644 --- a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetComponentConfigurationAndDocumentationTest.java @@ -33,9 +33,14 @@ public class DataSetComponentConfigurationAndDocumentationTest extends ContextTe @Test public void testComponentConfiguration() throws Exception { DataSetComponent comp = context.getComponent("dataset", DataSetComponent.class); - EndpointConfiguration conf = comp.createConfiguration("dataset:foo?minRate=3"); - - assertEquals("3", conf.getParameter("minRate")); + EndpointConfiguration conf = comp.createConfiguration("dataset:foo?minRate=3&produceDelay=33&consumeDelay=333&preloadSize=3333&initialDelay=33333&disableDataSetIndex=true"); + + assertEquals("Unexpected endpoint configuration value for minRate", "3", conf.getParameter("minRate")); + assertEquals("Unexpected endpoint configuration value for produceDelay", "33", conf.getParameter("produceDelay")); + assertEquals("Unexpected endpoint configuration value for consumeDelay", "333", conf.getParameter("consumeDelay")); + assertEquals("Unexpected endpoint configuration value for preloadSize", "3333", conf.getParameter("preloadSize")); + assertEquals("Unexpected endpoint configuration value for initialDelay", "33333", conf.getParameter("initialDelay")); + assertEquals("Unexpected endpoint configuration value for disableDataSetIndex", "true", conf.getParameter("disableDataSetIndex")); ComponentConfiguration compConf = comp.createComponentConfiguration(); String json = compConf.createParameterJsonSchema(); http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java index b255c6e..1e9c65b 100644 --- a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetConsumerTest.java @@ -19,16 +19,21 @@ package org.apache.camel.component.dataset; import javax.naming.Context; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; /** * @version */ public class DataSetConsumerTest extends ContextTestSupport { - protected SimpleDataSet dataSet = new SimpleDataSet(20); + protected SimpleDataSet dataSet = new SimpleDataSet(5); final String dataSetName = "foo"; final String dataSetUri = "dataset://" + dataSetName; + final String dataSetUriWithDisableDataSetIndexSetToFalse = dataSetUri + "?disableDataSetIndex=false"; + final String dataSetUriWithDisableDataSetIndexSetToTrue = dataSetUri + "?disableDataSetIndex=true"; final String resultUri = "mock://result"; @Override @@ -41,6 +46,7 @@ public class DataSetConsumerTest extends ContextTestSupport { /** * Ensure the expected message count for a consumer-only endpoint defaults to zero */ + @Test public void testConsumerOnlyEndpoint() throws Exception { context.addRoutes(new RouteBuilder() { @@ -52,9 +58,11 @@ public class DataSetConsumerTest extends ContextTestSupport { }); context.start(); - assertEquals("expectedMessageCount should be -1 for a consumer-only endpoint", -1, getMockEndpoint(dataSetUri).getExpectedCount()); + assertEquals("expectedMessageCount should be unset(i.e. -1) for a consumer-only endpoint", -1, getMockEndpoint(dataSetUri).getExpectedCount()); - getMockEndpoint(resultUri).expectedMessageCount((int)dataSet.getSize()); + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) dataSet.getSize()); + result.assertMessagesAscending(header(Exchange.DATASET_INDEX)); assertMockEndpointsSatisfied(); } @@ -62,6 +70,7 @@ public class DataSetConsumerTest extends ContextTestSupport { /** * Ensure the expected message count for a consumer-producer endpoint defaults to the size of the dataset */ + @Test public void testConsumerWithProducer() throws Exception { context.addRoutes(new RouteBuilder() { @Override @@ -75,7 +84,66 @@ public class DataSetConsumerTest extends ContextTestSupport { assertEquals("expectedMessageCount should be the same as the DataSet size for a consumer-producer endpoint", dataSet.getSize(), getMockEndpoint(dataSetUri).getExpectedCount()); - getMockEndpoint(resultUri).expectedMessageCount((int)dataSet.getSize()); + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) dataSet.getSize()); + result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class)); + + assertMockEndpointsSatisfied(); + } + + // TODO: Add tests for dataSetIndex URI parameters + @Test + public void testWithDisableDataSetIndexUriParameterUnset() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(dataSetUri) + .to(resultUri); + } + }); + context.start(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) dataSet.getSize()); + result.allMessages().header(Exchange.DATASET_INDEX).isNotNull(); + result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class)); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testWithDisableDataSetIndexUriParameterSetToFalse() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(dataSetUriWithDisableDataSetIndexSetToFalse) + .to(resultUri); + } + }); + context.start(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) dataSet.getSize()); + result.allMessages().header(Exchange.DATASET_INDEX).isNotNull(); + result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class)); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testWithDisableDataSetIndexUriParameterSetToTrue() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(dataSetUriWithDisableDataSetIndexSetToTrue) + .to(resultUri); + } + }); + context.start(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) dataSet.getSize()); + result.allMessages().header(Exchange.DATASET_INDEX).isNull(); assertMockEndpointsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/b98c5c8b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java index c076e04..f3d930c 100644 --- a/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetProducerTest.java @@ -20,6 +20,10 @@ import javax.naming.Context; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; /** * @version @@ -29,6 +33,9 @@ public class DataSetProducerTest extends ContextTestSupport { final String dataSetName = "foo"; final String dataSetUri = "dataset://" + dataSetName; + final String dataSetUriWithDisableDataSetIndexSetToFalse = dataSetUri + "?disableDataSetIndex=false"; + final String dataSetUriWithDisableDataSetIndexSetToTrue = dataSetUri + "?disableDataSetIndex=true"; + final String sourceUri = "direct://source"; final String resultUri = "mock://result"; @Override @@ -38,22 +45,299 @@ public class DataSetProducerTest extends ContextTestSupport { return context; } + @Test public void testSendingMessagesExplicitlyToDataSetEndpoint() throws Exception { long size = dataSet.getSize(); for (long i = 0; i < size; i++) { - template.sendBodyAndHeader(dataSetUri, "<hello>world!</hello>", Exchange.DATASET_INDEX, i); + template.sendBodyAndHeader(dataSetUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i); } assertMockEndpointsSatisfied(); } + @Test public void testSendingMessagesExplicitlyToDataSetEndpointWithoutDataSetIndex() throws Exception { long size = dataSet.getSize(); for (long i = 0; i < size; i++) { - template.sendBody(dataSetUri, "<hello>world!</hello>"); + template.sendBody(dataSetUri, dataSet.getDefaultBody()); } assertMockEndpointsSatisfied(); } + /** + * Verfiy that the CamelDataSetIndex header is optional when the disableDataSetIndex parameter is unset + */ + @Test + public void testNotSettingDataSetIndexHeaderWhenDisableDataSetIndexUriParameterIsUnset() throws Exception { + long size = dataSet.getSize(); + for (long i = 0; i < size; i++) { + if (0 == (size % 2)) { + template.sendBodyAndHeader(dataSetUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i); + } else { + template.sendBody(dataSetUri, dataSet.getDefaultBody()); + } + } + + assertMockEndpointsSatisfied(); + } + + /** + * Verfiy tha the CamelDataSetIndex header is optional when the disableDataSetIndex parameter is true + */ + @Test + public void testNotSettingDataSetIndexHeaderWhenDisableDataSetIndexUriParameterSetToTrue() throws Exception { + long size = dataSet.getSize(); + for (long i = 0; i < size; i++) { + if (0 == (size % 2)) { + template.sendBodyAndHeader(dataSetUriWithDisableDataSetIndexSetToTrue, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i); + } else { + template.sendBody(dataSetUriWithDisableDataSetIndexSetToTrue, dataSet.getDefaultBody()); + } + } + for (long i = 0; i < size; i++) { + } + + assertMockEndpointsSatisfied(); + } + + /** + * Verify tha the CamelDataSetIndex header is required when the disableDataSetIndex parameter is false + */ + @Test + public void testNotSettingDataSetIndexHeaderWhenDisableDataSetIndexUriParameterSetToFalse() throws Exception { + long size = dataSet.getSize(); + for (long i = 0; i < size; i++) { + template.sendBody(dataSetUriWithDisableDataSetIndexSetToFalse, dataSet.getDefaultBody()); + } + + try { + getMockEndpoint(dataSetUriWithDisableDataSetIndexSetToFalse).assertIsSatisfied(); + } catch (AssertionError assertionError) { + // Check as much of the string as possible - but the ExchangeID at the end will be unique + String expectedErrorString = dataSetUriWithDisableDataSetIndexSetToFalse + + " Failed due to caught exception: " + + NoSuchHeaderException.class.getName() + + ": No '" + Exchange.DATASET_INDEX + + "' header available of type: java.lang.Long. Exchange"; + String actualErrorString = assertionError.getMessage(); + if (actualErrorString.startsWith(expectedErrorString)) { + // This is what we expect + return; + } else { + throw assertionError; + } + } + + fail("AssertionError should have been generated"); + } + + @Test + public void testSendingMessagesExplicitlyToDataSetEndpointWithoutDataSetIndexAndDisableDataSetIndexUriParameterSetToTrue() throws Exception { + long size = dataSet.getSize(); + for (long i = 0; i < size; i++) { + template.sendBody(dataSetUriWithDisableDataSetIndexSetToTrue, dataSet.getDefaultBody()); + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testDisableDataSetIndexUriParameterUnset() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sourceUri) + .to(dataSetUri) + .to(resultUri); + } + }); + context.start(); + + long size = dataSet.getSize(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) size); + result.allMessages().header(Exchange.DATASET_INDEX).isNotNull(); + result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class)); + + for (long i = 0; i < size; i++) { + template.sendBody(sourceUri, dataSet.getDefaultBody()); + } + + assertMockEndpointsSatisfied(); + + result.assertMessagesAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class)); + } + + @Test + public void testDisableDataSetIndexUriParameterSetToTrue() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sourceUri) + .to(dataSetUriWithDisableDataSetIndexSetToTrue) + .to(resultUri); + } + }); + context.start(); + + long size = dataSet.getSize(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) size); + result.allMessages().header(Exchange.DATASET_INDEX).isNull(); + + for (long i = 0; i < size; i++) { + template.sendBody(sourceUri, dataSet.getDefaultBody()); + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testDisableDataSetIndexUriParameterSetToFalse() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sourceUri) + .to(dataSetUriWithDisableDataSetIndexSetToFalse) + .to(resultUri); + } + }); + context.start(); + + long size = dataSet.getSize(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) size); + result.expectsAscending(header(Exchange.DATASET_INDEX).convertTo(Number.class)); + result.allMessages().header(Exchange.DATASET_INDEX).isNotNull(); + + for (long i = 0; i < size; i++) { + template.sendBodyAndHeader(sourceUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i); + } + + assertMockEndpointsSatisfied(); + } + + + @Test + public void testInvalidDataSetIndexValueWithDisableDataSetIndexUriParameterUnset() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sourceUri) + .to(dataSetUri) + .to(resultUri); + } + }); + context.start(); + + long size = dataSet.getSize(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) size); + result.allMessages().header(Exchange.DATASET_INDEX).isNotNull(); + + for (long i = 0; i < size; i++) { + if (i == (size / 2)) { + template.sendBodyAndHeader(sourceUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i + 10); + } else { + template.sendBody(sourceUri, dataSet.getDefaultBody()); + } + } + + try { + assertMockEndpointsSatisfied(); + } catch (AssertionError assertionError) { + // Check as much of the string as possible - but the ExchangeID at the end will be unique + String expectedErrorString = dataSetUri + " Failed due to caught exception: " + + AssertionError.class.getName() + + ": Header: " + Exchange.DATASET_INDEX + " does not match. Expected: " + + size / 2 + " but was: " + (size / 2 + 10) + " on Exchange"; + String actualErrorString = assertionError.getMessage(); + if (actualErrorString.startsWith(expectedErrorString)) { + // This is what we expect + return; + } else { + throw assertionError; + } + } + + fail("AssertionError should have been generated"); + } + + @Test + public void testInvalidDataSetIndexValueWithDisableDataSetIndexUriParameterSetToTrue() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sourceUri) + .to(dataSetUriWithDisableDataSetIndexSetToTrue) + .to(resultUri); + } + }); + context.start(); + + long size = dataSet.getSize(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) size); + + for (long i = 0; i < size; i++) { + if (i == (size / 2)) { + template.sendBodyAndHeader(sourceUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i + 10); + } else { + template.sendBody(sourceUri, dataSet.getDefaultBody()); + } + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInvalidDataSetIndexValueWithDisableDataSetIndexUriParameterSetToFalse() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sourceUri) + .to(dataSetUriWithDisableDataSetIndexSetToFalse) + .to(resultUri); + } + }); + context.start(); + + long size = dataSet.getSize(); + + MockEndpoint result = getMockEndpoint(resultUri); + result.expectedMessageCount((int) size); + result.allMessages().header(Exchange.DATASET_INDEX).isNotNull(); + + for (long i = 0; i < size; i++) { + if (i == (size / 2)) { + template.sendBodyAndHeader(sourceUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i + 10); + } else { + template.sendBodyAndHeader(sourceUri, dataSet.getDefaultBody(), Exchange.DATASET_INDEX, i); + } + } + + try { + assertMockEndpointsSatisfied(); + } catch (AssertionError assertionError) { + // Check as much of the string as possible - but the ExchangeID at the end will be unique + String expectedErrorString = dataSetUriWithDisableDataSetIndexSetToFalse + " Failed due to caught exception: " + + AssertionError.class.getName() + ": Header: " + Exchange.DATASET_INDEX + + " does not match. Expected: " + size / 2 + " but was: " + (size / 2 + 10) + " on Exchange"; + String actualErrorString = assertionError.getMessage(); + if (actualErrorString.startsWith(expectedErrorString)) { + // This is what we expect + return; + } else { + throw assertionError; + } + } + + fail("AssertionError should have been generated"); + } } \ No newline at end of file