This is an automated email from the ASF dual-hosted git repository. ffang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push: new f43a4b4 [CAMEL-17758]add tests in camel-cassandraql-starter (#458) f43a4b4 is described below commit f43a4b40a226ba3a2a28b8716230669c5e5e150a Author: Freeman(Yue) Fang <freeman.f...@gmail.com> AuthorDate: Tue Mar 8 17:38:51 2022 -0500 [CAMEL-17758]add tests in camel-cassandraql-starter (#458) (cherry picked from commit c7335a6467079c6bc1901037681d351597834752) --- .../camel-cassandraql-starter/pom.xml | 70 ++++++ .../cassandra/MockLoadBalancingPolicy.java | 43 ++++ .../integration/CassandraComponentBeanRefIT.java | 92 ++++++++ .../integration/CassandraComponentConsumerIT.java | 134 ++++++++++++ .../integration/CassandraComponentProducerIT.java | 234 +++++++++++++++++++++ .../CassandraComponentProducerUnpreparedIT.java | 156 ++++++++++++++ .../cassandra/springboot/BaseCassandra.java | 113 ++++++++++ .../cassandra/CassandraAggregationIT.java | 123 +++++++++++ .../CassandraAggregationRepositoryIT.java | 222 +++++++++++++++++++ .../CassandraAggregationSerializedHeadersIT.java | 128 +++++++++++ .../NamedCassandraAggregationRepositoryIT.java | 227 ++++++++++++++++++++ .../camel/processor/aggregate/util/HeaderDto.java | 62 ++++++ .../cassandra/CassandraIdempotentIT.java | 96 +++++++++ .../cassandra/CassandraIdempotentRepositoryIT.java | 153 ++++++++++++++ .../NamedCassandraIdempotentRepositoryIT.java | 155 ++++++++++++++ .../src/test/resources/BasicDataSet.cql | 6 + .../src/test/resources/IdempotentDataSet.cql | 8 + .../src/test/resources/NamedIdempotentDataSet.cql | 8 + .../src/test/resources/initScript.cql | 33 +++ 19 files changed, 2063 insertions(+) diff --git a/components-starter/camel-cassandraql-starter/pom.xml b/components-starter/camel-cassandraql-starter/pom.xml index 347a961..36a0f63 100644 --- a/components-starter/camel-cassandraql-starter/pom.xml +++ b/components-starter/camel-cassandraql-starter/pom.xml @@ -39,6 +39,14 @@ <artifactId>camel-cassandraql</artifactId> <version>${camel-version}</version> </dependency> + <!-- test infra --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-cassandra</artifactId> + <version>${camel-version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> <!--START OF GENERATED CODE--> <dependency> <groupId>com.datastax.oss</groupId> @@ -56,4 +64,66 @@ </dependency> <!--END OF GENERATED CODE--> </dependencies> + <profiles> + <!-- activate integration test if the docker socket file is accessible --> + <profile> + <id>cassandraql-integration-tests-docker-file</id> + <activation> + <file> + <exists>/var/run/docker.sock</exists> + </file> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <visibleassertions.silence>true</visibleassertions.silence> + </systemPropertyVariables> + </configuration> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <!-- activate integration test if the DOCKER_HOST env var is set --> + <profile> + <id>cassandraql-integration-tests-docker-env</id> + <activation> + <property> + <name>env.DOCKER_HOST</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <visibleassertions.silence>true</visibleassertions.silence> + </systemPropertyVariables> + </configuration> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java new file mode 100644 index 0000000..8a68c07 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra; + +import java.util.Queue; + +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.api.core.session.Session; +import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +public class MockLoadBalancingPolicy extends DefaultLoadBalancingPolicy { + + public static boolean used; + + public MockLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) { + super(context, profileName); + } + + @NonNull + @Override + public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) { + MockLoadBalancingPolicy.used = true; + return super.newQueryPlan(request, session); + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java new file mode 100644 index 0000000..7237352 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra.integration; + + + + +import com.datastax.oss.driver.api.core.CqlSession; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.CassandraEndpoint; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraComponentBeanRefIT.class, + CassandraComponentBeanRefIT.TestConfiguration.class + } +) +public class CassandraComponentBeanRefIT extends BaseCassandra { + + public static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)"; + public static final String SESSION_URI = "cql:bean:cassandraSession?cql=" + CQL; + + @Bean("cassandraSession") + protected CqlSession createSession() { + + return getSession(); + } + + + + + @Test + public void testSession() { + + + CassandraEndpoint endpoint = context.getEndpoint(SESSION_URI, CassandraEndpoint.class); + assertNotNull(endpoint, "No endpoint found for uri: " + SESSION_URI); + + assertEquals(KEYSPACE_NAME, endpoint.getKeyspace()); + assertEquals(CQL, endpoint.getCql()); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:inputSession").to(SESSION_URI); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java new file mode 100644 index 0000000..676da47 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra.integration; + + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.datastax.oss.driver.api.core.cql.Row; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraComponentConsumerIT.class, + CassandraComponentConsumerIT.TestConfiguration.class + } +) +public class CassandraComponentConsumerIT extends BaseCassandra { + + static final String CQL = "select login, first_name, last_name from camel_user"; + + @EndpointInject("mock:resultAll") + MockEndpoint mock; + + @EndpointInject("mock:resultUnprepared") + MockEndpoint mockResulutUnprepared; + + @EndpointInject("mock:resultOne") + MockEndpoint mockResulutOne; + + + @Test + public void testConsumeAll() throws Exception { + + mock.expectedMinimumMessageCount(1); + mock.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) { + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof List); + } + }); + mock.await(1, TimeUnit.SECONDS); + mock.assertIsSatisfied(); + } + + @Test + public void testConsumeUnprepared() throws Exception { + + mockResulutUnprepared.expectedMinimumMessageCount(1); + mockResulutUnprepared.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) { + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof List); + } + }); + mockResulutUnprepared.await(1, TimeUnit.SECONDS); + mockResulutUnprepared.assertIsSatisfied(); + } + + @Test + public void testConsumeOne() throws Exception { + + mockResulutOne.expectedMinimumMessageCount(1); + mockResulutOne.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) { + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof Row); + } + }); + mock.await(1, TimeUnit.SECONDS); + + mockResulutOne.assertIsSatisfied(); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)).to("mock:resultAll"); + from(String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL)) + .to("mock:resultUnprepared"); + from(String.format("cql://%s/%s?cql=%s&resultSetConversionStrategy=ONE", getUrl(), KEYSPACE_NAME, CQL)) + .to("mock:resultOne"); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java new file mode 100644 index 0000000..f17e502 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra.integration; + + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.update.Update; + + +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.CassandraConstants; +import org.apache.camel.component.cassandra.CassandraEndpoint; +import org.apache.camel.component.cassandra.MockLoadBalancingPolicy; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraComponentProducerIT.class, + CassandraComponentProducerIT.TestConfiguration.class + } +) +public class CassandraComponentProducerIT extends BaseCassandra { + + static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)"; + static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user"; + + @Produce("direct:input") + ProducerTemplate producerTemplate; + + @Produce("direct:inputNoParameter") + ProducerTemplate noParameterProducerTemplate; + + @Produce("direct:inputNotConsistent") + ProducerTemplate notConsistentProducerTemplate; + + @Produce("direct:loadBalancingPolicy") + ProducerTemplate loadBalancingPolicyTemplate; + + @Produce("direct:inputNoEndpointCql") + ProducerTemplate producerTemplateNoEndpointCql; + + + @Test + public void testRequestUriCql() { + producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang")); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "w_jiang")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Willem", row.getString("first_name")); + assertEquals("Jiang", row.getString("last_name")); + } + + @Test + public void testRequestNoParameterNull() { + Object response = noParameterProducerTemplate.requestBody(null); + + assertNotNull(response); + assertIsInstanceOf(List.class, response); + } + + @Test + public void testRequestNoParameterEmpty() { + Object response = noParameterProducerTemplate.requestBody(Collections.emptyList()); + + assertNotNull(response); + assertIsInstanceOf(List.class, response); + } + + @Test + public void testRequestMessageCql() { + producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY, + "update camel_user set first_name=?, last_name=? where login=?"); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + } + + @Test + public void testLoadBalancing() { + loadBalancingPolicyTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, + CassandraConstants.CQL_QUERY, + "update camel_user set first_name=?, last_name=? where login=?"); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + + Assertions.assertTrue(MockLoadBalancingPolicy.used); + } + + /** + * Test with incoming message containing a header with RegularStatement. + */ + @Test + public void testRequestMessageStatement() { + + Update update = QueryBuilder.update("camel_user") + .setColumn("first_name", bindMarker()) + .setColumn("last_name", bindMarker()) + .whereColumn("login").isEqualTo(bindMarker()); + producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY, + update.build()); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + } + + /** + * Simulate different CQL statements in the incoming message containing a header with RegularStatement, justifying + * the cassandracql endpoint not containing a "cql" Uri parameter + */ + @Test + public void testEndpointNoCqlParameter() { + Update update = QueryBuilder.update("camel_user") + .setColumn("first_name", bindMarker()) + .whereColumn("login").isEqualTo(bindMarker()); + producerTemplateNoEndpointCql.sendBodyAndHeader(new Object[] { "Claus 2", "c_ibsen" }, CassandraConstants.CQL_QUERY, + update.build()); + + ResultSet resultSet1 = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row1 = resultSet1.one(); + assertNotNull(row1); + assertEquals("Claus 2", row1.getString("first_name")); + assertEquals("Ibsen", row1.getString("last_name")); + + update = QueryBuilder.update("camel_user") + .setColumn("last_name", bindMarker()) + .whereColumn("login").isEqualTo(bindMarker()); + producerTemplateNoEndpointCql.sendBodyAndHeader(new Object[] { "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY, + update.build()); + + ResultSet resultSet2 = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row2 = resultSet2.one(); + assertNotNull(row2); + assertEquals("Claus 2", row2.getString("first_name")); + assertEquals("Ibsen 2", row2.getString("last_name")); + } + + @Test + public void testRequestNotConsistent() { + CassandraEndpoint endpoint + = context.getEndpoint(String.format("cql://%s/%s?cql=%s&consistencyLevel=ANY", getUrl(), KEYSPACE_NAME, CQL), + CassandraEndpoint.class); + assertEquals(ConsistencyLevel.ANY, endpoint.getConsistencyLevel()); + + notConsistentProducerTemplate.requestBody(Arrays.asList("j_anstey", "Jonathan", "Anstey")); + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:input").to(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)); + from("direct:inputNoParameter") + .to(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL)); + from("direct:loadBalancingPolicy").to(String.format( + "cql://%s/%s?cql=%s&loadBalancingPolicyClass=org.apache.camel.component.cassandra.MockLoadBalancingPolicy", + getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL)); + from("direct:inputNotConsistent") + .to(String.format("cql://%s/%s?cql=%s&consistencyLevel=ANY", getUrl(), KEYSPACE_NAME, CQL)); + from("direct:inputNoEndpointCql").to(String.format("cql://%s/%s", getUrl(), KEYSPACE_NAME)); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java new file mode 100644 index 0000000..c8bc4f1 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra.integration; + + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; + +import java.util.Arrays; +import java.util.List; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.update.Update; + +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.CassandraConstants; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraComponentProducerUnpreparedIT.class, + CassandraComponentProducerUnpreparedIT.TestConfiguration.class + } +) +public class CassandraComponentProducerUnpreparedIT extends BaseCassandra { + + static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)"; + static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user"; + + + + + @Produce("direct:input") + ProducerTemplate producerTemplate; + + @Produce("direct:inputNoParameter") + ProducerTemplate noParameterProducerTemplate; + + + @Test + public void testRequestUriCql() { + producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang")); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "w_jiang")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Willem", row.getString("first_name")); + assertEquals("Jiang", row.getString("last_name")); + } + + @Test + public void testRequestNoParameterNull() { + Object response = noParameterProducerTemplate.requestBody(null); + + assertNotNull(response); + assertIsInstanceOf(List.class, response); + } + + @Test + public void testRequestNoParameterEmpty() { + Object response = noParameterProducerTemplate.requestBody(null); + + assertNotNull(response); + assertIsInstanceOf(List.class, response); + } + + @Test + public void testRequestMessageCql() { + producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY, + "update camel_user set first_name=?, last_name=? where login=?"); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + } + + /** + * Test with incoming message containing a header with RegularStatement. + */ + @Test + public void testRequestMessageStatement() { + Update update = QueryBuilder.update("camel_user") + .setColumn("first_name", literal("Claus 2")) + .setColumn("last_name", literal("Ibsen 2")) + .whereColumn("login").isEqualTo(literal("c_ibsen")); + producerTemplate.requestBodyAndHeader(null, CassandraConstants.CQL_QUERY, update.build()); + + ResultSet resultSet = getSession() + .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen")); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + + from("direct:input") + .to(String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL)); + from("direct:inputNoParameter").to( + String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL)); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java new file mode 100644 index 0000000..dd26263 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra.springboot; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.time.Duration; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; + +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.test.infra.cassandra.services.CassandraLocalContainerService; +import org.springframework.beans.factory.annotation.Autowired; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +public class BaseCassandra { + + @Autowired + protected CamelContext context; + + @Autowired + protected ProducerTemplate template; + + @RegisterExtension + public static CassandraLocalContainerService service; + + public static final String KEYSPACE_NAME = "camel_ks"; + public static final String DATACENTER_NAME = "datacenter1"; + + private CqlSession session; + + static { + service = new CassandraLocalContainerService(); + + service.getContainer() + .withInitScript("initScript.cql") + .withNetworkAliases("cassandra"); + + } + + @BeforeEach + public void beforeEach() throws Exception { + executeScript("BasicDataSet.cql"); + + } + + public void executeScript(String pathToScript) throws IOException { + String s = IOUtils.toString(getClass().getResourceAsStream("/" + pathToScript), "UTF-8"); + String[] statements = s.split(";"); + for (int i = 0; i < statements.length; i++) { + if (!statements[i].isEmpty()) { + executeCql(statements[i]); + } + } + } + + public void executeCql(String cql) { + getSession().execute(cql); + } + + @AfterEach + protected void doPostTearDown() throws Exception { + + try { + if (session != null) { + session.close(); + session = null; + } + } catch (Exception e) { + // ignored + } + } + + public CqlSession getSession() { + if (session == null) { + InetSocketAddress endpoint + = new InetSocketAddress(service.getCassandraHost(), service.getCQL3Port()); + //create a new session + session = CqlSession.builder() + .withLocalDatacenter(DATACENTER_NAME) + .withKeyspace(KEYSPACE_NAME) + .withConfigLoader(DriverConfigLoader.programmaticBuilder() + .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5)).build()) + .addContactPoint(endpoint).build(); + } + return session; + } + + public String getUrl() { + return service.getCQL3Endpoint(); + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java new file mode 100644 index 0000000..07fbcca --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.cassandra; + + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraAggregationIT.class, + CassandraAggregationIT.TestConfiguration.class + } +) +public class CassandraAggregationIT extends BaseCassandra { + + CassandraAggregationRepository aggregationRepository; + + + @EndpointInject("mock:output") + MockEndpoint mockOutput; + + @BeforeEach + protected void doPreSetup() throws Exception { + aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID"); + aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION"); + aggregationRepository.start(); + + } + + + @AfterEach + public void tearDown() throws Exception { + + aggregationRepository.stop(); + } + + private void send(String aggregationId, String body) { + super.template.sendBodyAndHeader("direct:input", body, "aggregationId", aggregationId); + } + + @Test + public void testAggregationRoute() throws Exception { + // Given + + mockOutput.expectedMessageCount(2); + mockOutput.expectedBodiesReceivedInAnyOrder("A,C,E", "B,D"); + // When + send("1", "A"); + send("2", "B"); + send("1", "C"); + send("2", "D"); + send("1", "E"); + // Then + mockOutput.assertIsSatisfied(4000L); + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + AggregationStrategy aggregationStrategy = new AggregationStrategy() { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String oldBody = oldExchange.getIn().getBody(String.class); + String newBody = newExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(oldBody + "," + newBody); + return oldExchange; + } + }; + from("direct:input").aggregate(header("aggregationId"), aggregationStrategy).completionSize(3) + .completionTimeout(3000L).aggregationRepository(aggregationRepository) + .to("mock:output"); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java new file mode 100644 index 0000000..ba3e3ff --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.cassandra; + + +import java.util.Set; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.support.DefaultExchange; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraAggregationRepositoryIT.class + } +) +public class CassandraAggregationRepositoryIT extends BaseCassandra { + + CassandraAggregationRepository aggregationRepository; + + + @EndpointInject("mock:output") + MockEndpoint mockOutput; + + @BeforeEach + protected void doPreSetup() throws Exception { + aggregationRepository = new CassandraAggregationRepository(getSession()); + aggregationRepository.start(); + + } + + @AfterEach + public void tearDown() throws Exception { + aggregationRepository.stop(); + } + + private boolean exists(String key) { + return getSession().execute(String.format("select KEY from CAMEL_AGGREGATION where KEY='%s'", key)).one() != null; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(context); + // When + aggregationRepository.add(context, key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(context); + aggregationRepository.add(context, key, exchange); + assertTrue(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(context, key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(context, key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(context); + aggregationRepository.add(context, key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(context, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(context); + assertFalse(exists(key)); + // When + aggregationRepository.remove(context, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = { "GetKeys1", "GetKeys2" }; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(context); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(context, key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context, "Exchange_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys = new String[3]; + for (int i = 1; i < 4; i++) { + keys[i - 1] = "Confirm" + i; + } + addExchanges(keys); + for (String key : keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context, "Exchange-Confirm5"); + // Then + for (String key : keys) { + assertTrue(exists(key)); + } + } + + private void addExchanges(String... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(context); + exchange.setExchangeId("Exchange-" + key); + aggregationRepository.add(context, key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = { "Scan1", "Scan2" }; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(context); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains("Exchange-" + key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = { "Recover1", "Recover2" }; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(context, "Exchange-Recover2"); + Exchange exchange3 = aggregationRepository.recover(context, "Exchange-Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } + +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java new file mode 100644 index 0000000..3d6c7cc --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.cassandra; + + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.util.HeaderDto; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraAggregationSerializedHeadersIT.class, + CassandraAggregationSerializedHeadersIT.TestConfiguration.class + } +) +public class CassandraAggregationSerializedHeadersIT extends BaseCassandra { + + CassandraAggregationRepository aggregationRepository; + + + @EndpointInject("mock:output") + MockEndpoint mockOutput; + + @BeforeEach + protected void doPreSetup() throws Exception { + aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID"); + aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION"); + aggregationRepository.setAllowSerializedHeaders(true); + aggregationRepository.start(); + + } + + + @AfterEach + public void tearDown() throws Exception { + + aggregationRepository.stop(); + } + + private void send(HeaderDto aggregationId, String body) { + template.sendBodyAndHeader("direct:input", body, "aggregationId", aggregationId); + } + + @Test + public void testAggregationRoute() throws Exception { + // Given + + mockOutput.expectedMessageCount(2); + mockOutput.expectedBodiesReceivedInAnyOrder("A,C,E", "B,D"); + HeaderDto dto1 = new HeaderDto("org", "company", 1); + HeaderDto dto2 = new HeaderDto("org", "company", 2); + // When + send(dto1, "A"); + send(dto2, "B"); + send(dto1, "C"); + send(dto2, "D"); + send(dto1, "E"); + // Then + mockOutput.assertIsSatisfied(4000L); + + } + + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + AggregationStrategy aggregationStrategy = new AggregationStrategy() { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String oldBody = oldExchange.getIn().getBody(String.class); + String newBody = newExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(oldBody + "," + newBody); + return oldExchange; + } + }; + from("direct:input").aggregate(header("aggregationId"), aggregationStrategy).completionSize(3) + .completionTimeout(3000L).aggregationRepository(aggregationRepository) + .to("mock:output"); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java new file mode 100644 index 0000000..d1afc1c --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.cassandra; + + +import java.util.Set; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.support.DefaultExchange; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + NamedCassandraAggregationRepositoryIT.class + } +) +public class NamedCassandraAggregationRepositoryIT extends BaseCassandra { + + CassandraAggregationRepository aggregationRepository; + + + @EndpointInject("mock:output") + MockEndpoint mockOutput; + + @BeforeEach + protected void doPreSetup() throws Exception { + aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID"); + aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION"); + aggregationRepository.start(); + + } + + + @AfterEach + public void tearDown() throws Exception { + + aggregationRepository.stop(); + } + + private boolean exists(String key) { + return getSession().execute(String.format("select KEY from NAMED_CAMEL_AGGREGATION where NAME='ID' and KEY='%s'", key)) + .one() + != null; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(context); + // When + aggregationRepository.add(context, key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(context); + aggregationRepository.add(context, key, exchange); + assertTrue(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(context, key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(context, key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(context); + aggregationRepository.add(context, key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(context, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(context); + assertFalse(exists(key)); + // When + aggregationRepository.remove(context, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = { "GetKeys1", "GetKeys2" }; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(context); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(context, key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context, "Exchange_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys = new String[3]; + for (int i = 1; i < 4; i++) { + keys[i - 1] = "Confirm" + i; + } + addExchanges(keys); + for (String key : keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(context, "Exchange-Confirm5"); + // Then + for (String key : keys) { + assertTrue(exists(key)); + } + } + + private void addExchanges(String... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(context); + exchange.setExchangeId("Exchange-" + key); + aggregationRepository.add(context, key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = { "Scan1", "Scan2" }; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(context); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains("Exchange-" + key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = { "Recover1", "Recover2" }; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(context, "Exchange-Recover2"); + Exchange exchange3 = aggregationRepository.recover(context, "Exchange-Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } + +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java new file mode 100644 index 0000000..6010643 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.util; + +import java.io.Serializable; + +public class HeaderDto implements Cloneable, Serializable { + + private static final long serialVersionUID = -5004840651888298047L; + private String org; + private String type; + private int key; + + public HeaderDto(String org, String type, int key) { + this.org = org; + this.type = type; + this.key = key; + } + + public int getKey() { + return key; + } + + public void setKey(int key) { + this.key = key; + } + + public String getOrg() { + return org; + } + + public void setOrg(String org) { + this.org = org; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public String toString() { + return "HeaderDto [org=" + org + ", type=" + type + ", key=" + key + "]"; + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java new file mode 100644 index 0000000..2f54a58 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.cassandra; + + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraIdempotentIT.class, + CassandraIdempotentIT.TestConfiguration.class + } +) +public class CassandraIdempotentIT extends BaseCassandra { + + CassandraIdempotentRepository idempotentRepository; + + + @EndpointInject("mock:output") + MockEndpoint mockOutput; + + + + + private void send(String idempotentId, String body) { + super.template.sendBodyAndHeader("direct:input", body, "idempotentId", idempotentId); + } + + @Test + public void testIdempotentRoute() throws Exception { + // Given + + mockOutput.expectedMessageCount(2); + mockOutput.expectedBodiesReceivedInAnyOrder("A", "B"); + // When + send("1", "A"); + send("2", "B"); + send("1", "A"); + send("2", "B"); + send("1", "A"); + // Then + mockOutput.assertIsSatisfied(); + + } + + // ************************************* + // Config + // ************************************* + + @Configuration + public class TestConfiguration { + + @Bean + public RouteBuilder routeBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + idempotentRepository = new NamedCassandraIdempotentRepository(getSession(), "ID"); + idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT"); + idempotentRepository.start(); + from("direct:input").idempotentConsumer(header("idempotentId"), idempotentRepository).to("mock:output"); + } + }; + } + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java new file mode 100644 index 0000000..3313cc8 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.cassandra; + + +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + CassandraIdempotentRepositoryIT.class + } +) +public class CassandraIdempotentRepositoryIT extends BaseCassandra { + + CassandraIdempotentRepository idempotentRepository; + + + + + + @BeforeEach + protected void doPreSetup() throws Exception { + idempotentRepository = new CassandraIdempotentRepository(getSession()); + idempotentRepository.start(); + + } + + @Override + @BeforeEach + public void beforeEach() throws Exception { + executeScript("IdempotentDataSet.cql"); + } + + + @AfterEach + public void tearDown() throws Exception { + idempotentRepository.stop(); + } + + private boolean exists(String key) { + return getSession().execute(String.format("select KEY from CAMEL_IDEMPOTENT where KEY='%s'", key)).one() != null; + } + + @Test + public void testAddNotExists() { + // Given + String key = "Add_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.add(key); + // Then + assertTrue(result); + assertTrue(exists(key)); + } + + @Test + public void testAddExists() { + // Given + String key = "Add_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.add(key); + // Then + assertFalse(result); + assertTrue(exists(key)); + } + + @Test + public void testContainsNotExists() { + // Given + String key = "Contains_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertFalse(result); + } + + @Test + public void testContainsExists() { + // Given + String key = "Contains_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertTrue(result); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "Remove_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertFalse(result); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.remove(key); + // Then + assertTrue(result); + } + + @Test + public void testClear() { + // Given + String key = "Remove_Exists"; + assertTrue(exists(key)); + // When + idempotentRepository.clear(); + // Then + assertFalse(idempotentRepository.contains(key)); + } + +} diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java new file mode 100644 index 0000000..38a224b --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.cassandra; + + +import org.apache.camel.component.cassandra.springboot.BaseCassandra; +import org.apache.camel.spring.boot.CamelAutoConfiguration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.apache.camel.test.spring.junit5.CamelSpringBootTest; + + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) +@CamelSpringBootTest +@SpringBootTest( + classes = { + CamelAutoConfiguration.class, + NamedCassandraIdempotentRepositoryIT.class + } +) +public class NamedCassandraIdempotentRepositoryIT extends BaseCassandra { + + CassandraIdempotentRepository idempotentRepository; + + + + + + @BeforeEach + protected void doPreSetup() throws Exception { + idempotentRepository = new NamedCassandraIdempotentRepository(getSession(), "ID"); + idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT"); + idempotentRepository.start(); + + } + + @Override + @BeforeEach + public void beforeEach() throws Exception { + executeScript("NamedIdempotentDataSet.cql"); + } + + + @AfterEach + public void tearDown() throws Exception { + idempotentRepository.stop(); + } + + private boolean exists(String key) { + return getSession().execute(String.format("select KEY from NAMED_CAMEL_IDEMPOTENT where NAME='ID' and KEY='%s'", key)) + .one() + != null; + } + + @Test + public void testAddNotExists() { + // Given + String key = "Add_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.add(key); + // Then + assertTrue(result); + assertTrue(exists(key)); + } + + @Test + public void testAddExists() { + // Given + String key = "Add_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.add(key); + // Then + assertFalse(result); + assertTrue(exists(key)); + } + + @Test + public void testContainsNotExists() { + // Given + String key = "Contains_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertFalse(result); + } + + @Test + public void testContainsExists() { + // Given + String key = "Contains_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertTrue(result); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "Remove_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertFalse(result); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.remove(key); + // Then + assertTrue(result); + } + + @Test + public void testClear() { + // Given + String key = "Remove_Exists"; + assertTrue(exists(key)); + // When + idempotentRepository.clear(); + // Then + assertFalse(idempotentRepository.contains(key)); + } +} diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql new file mode 100644 index 0000000..9054e40 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql @@ -0,0 +1,6 @@ +TRUNCATE camel_user; + +INSERT INTO camel_user(login, first_name, last_name) + VALUES('j_strachan','James','Strachan'); +INSERT INTO camel_user(login, first_name, last_name) + VALUES('c_ibsen','Claus','Ibsen'); \ No newline at end of file diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql new file mode 100644 index 0000000..2d4b974 --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql @@ -0,0 +1,8 @@ +TRUNCATE CAMEL_IDEMPOTENT; + +INSERT INTO CAMEL_IDEMPOTENT(KEY) + VALUES('Add_Exists'); +INSERT INTO camel_ks.CAMEL_IDEMPOTENT(KEY) + VALUES('Contains_Exists'); +INSERT INTO camel_ks.CAMEL_IDEMPOTENT(KEY) + VALUES('Remove_Exists'); \ No newline at end of file diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql new file mode 100644 index 0000000..a8adebd --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql @@ -0,0 +1,8 @@ +TRUNCATE NAMED_CAMEL_IDEMPOTENT; + +INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY) + VALUES('ID','Add_Exists'); +INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY) + VALUES('ID','Contains_Exists'); +INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY) + VALUES('ID','Remove_Exists'); \ No newline at end of file diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql b/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql new file mode 100644 index 0000000..0253f9b --- /dev/null +++ b/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql @@ -0,0 +1,33 @@ +CREATE KEYSPACE IF NOT EXISTS camel_ks WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; + +CREATE TABLE camel_ks.camel_user ( + login varchar PRIMARY KEY, + first_name varchar, + last_name varchar +); + +CREATE TABLE camel_ks.NAMED_CAMEL_AGGREGATION ( + NAME varchar, + KEY varchar, + EXCHANGE_ID varchar, + EXCHANGE blob, + PRIMARY KEY (NAME, KEY) +); + +CREATE TABLE camel_ks.NAMED_CAMEL_IDEMPOTENT ( + NAME varchar, + KEY varchar, + PRIMARY KEY (NAME, KEY) +); + +CREATE TABLE camel_ks.CAMEL_IDEMPOTENT ( + KEY varchar, + PRIMARY KEY (KEY) +); + +CREATE TABLE camel_ks.CAMEL_AGGREGATION ( + KEY varchar, + EXCHANGE_ID varchar, + EXCHANGE blob, + PRIMARY KEY (KEY) +);