Repository: camel Updated Branches: refs/heads/master d5253916c -> 7764a5193
CAMEL-7700: Limit the SQL component maxMessagesPerPoll option by using the jdbcTemplate.setMaxRows(size) method Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7764a519 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7764a519 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7764a519 Branch: refs/heads/master Commit: 7764a519355bdd0b13c36817022a2a4819eeb93a Parents: d525391 Author: cmueller <cmuel...@apache.org> Authored: Thu Aug 14 23:35:58 2014 +0200 Committer: cmueller <cmuel...@apache.org> Committed: Thu Aug 14 23:36:04 2014 +0200 ---------------------------------------------------------------------- .../apache/camel/component/sql/SqlConsumer.java | 17 ++-- .../sql/SqlConsumerMaxMessagesPerPollTest.java | 101 +++++++++++++++++++ .../sql/createAndPopulateDatabase4.sql | 21 ++++ 3 files changed, 133 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7764a519/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index 57e80e5..8f9800c 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -36,9 +36,6 @@ import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; -/** - * - */ public class SqlConsumer extends ScheduledBatchPollingConsumer { private final String query; @@ -165,9 +162,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { int total = exchanges.size(); // limit if needed - if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { - log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll."); - total = maxMessagesPerPoll; + if (maxMessagesPerPoll > 0 && total == maxMessagesPerPoll) { + log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was more messages in this poll."); } for (int index = 0; index < total && isBatchAllowed(); index++) { @@ -311,4 +307,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) { this.breakBatchOnConsumeFail = breakBatchOnConsumeFail; } + + @Override + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + super.setMaxMessagesPerPoll(maxMessagesPerPoll); + + if (jdbcTemplate != null) { + jdbcTemplate.setMaxRows(maxMessagesPerPoll); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/7764a519/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java new file mode 100644 index 0000000..51a7bb0 --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import java.util.List; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +public class SqlConsumerMaxMessagesPerPollTest extends CamelTestSupport { + + private EmbeddedDatabase db; + + @Before + public void setUp() throws Exception { + db = new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY) + .addScript("sql/createAndPopulateDatabase4.sql") + .build(); + + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + db.shutdown(); + } + + @Test + public void maxMessagesPerPoll() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(3); + assertMockEndpointsSatisfied(); + + List<Exchange> exchanges = mock.getReceivedExchanges(); + assertBodyMapValue(1, "ID", exchanges.get(0)); + assertBodyMapValue("Camel", "PROJECT", exchanges.get(0)); + assertProperty(0, "CamelBatchIndex", exchanges.get(0)); + assertProperty(2, "CamelBatchSize", exchanges.get(0)); + assertProperty(Boolean.FALSE, "CamelBatchComplete", exchanges.get(0)); + + assertBodyMapValue(2, "ID", exchanges.get(1)); + assertBodyMapValue("AMQ", "PROJECT", exchanges.get(1)); + assertProperty(1, "CamelBatchIndex", exchanges.get(1)); + assertProperty(2, "CamelBatchSize", exchanges.get(1)); + assertProperty(Boolean.TRUE, "CamelBatchComplete", exchanges.get(1)); // end of the first batch + + assertBodyMapValue(3, "ID", exchanges.get(2)); + assertBodyMapValue("Linux", "PROJECT", exchanges.get(2)); + assertProperty(0, "CamelBatchIndex", exchanges.get(2)); // the second batch + assertProperty(1, "CamelBatchSize", exchanges.get(2)); // only one entry in this batch + assertProperty(Boolean.TRUE, "CamelBatchComplete", exchanges.get(2)); // there are no more entries yet + } + + private void assertProperty(Object value, String propertyName, Exchange exchange) { + assertEquals(value, exchange.getProperty(propertyName)); + } + + private void assertBodyMapValue(Object value, String key, Exchange exchange) { + assertEquals(value, exchange.getIn().getBody(Map.class).get(key)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + getContext().setTracing(true); + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("sql:select * from projects where processed = false order by id?maxMessagesPerPoll=2") + .to("mock:result") + .to("sql:update projects set processed = true where id = :#id"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7764a519/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql b/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql new file mode 100644 index 0000000..e8c6bd9 --- /dev/null +++ b/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql @@ -0,0 +1,21 @@ +-- ------------------------------------------------------------------------ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- ------------------------------------------------------------------------ + +create table projects (id integer primary key GENERATED ALWAYS AS IDENTITY, project varchar(10), license varchar(5), description varchar(1000) default null, processed boolean); +insert into projects (project, license, description, processed) values ('Camel', 'ASF', '', false); +insert into projects (project, license, description, processed) values ('AMQ', 'ASF', '', false); +insert into projects (project, license, description, processed) values ('Linux', 'XXX', '', false);