Updated Branches: refs/heads/master 099791206 -> 7ac3001e3
CAMEL-6194: Added @PreConsumed to camel-jpa. Thanks to Matt McCann for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7ac3001e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7ac3001e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7ac3001e Branch: refs/heads/master Commit: 7ac3001e3f4ac0564383c39047e0d532435cf63b Parents: 0997912 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Oct 12 10:44:45 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Oct 12 10:45:50 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/jpa/Consumed.java | 2 +- .../camel/component/jpa/DeleteHandler.java | 3 +- .../apache/camel/component/jpa/JpaConsumer.java | 53 +++++++++++++++- .../apache/camel/component/jpa/PreConsumed.java | 33 ++++++++++ .../org/apache/camel/examples/SendEmail.java | 13 ++++ .../camel/processor/jpa/JpaPreConsumedTest.java | 65 ++++++++++++++++++++ 6 files changed, 165 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7ac3001e/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/Consumed.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/Consumed.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/Consumed.java index 60c9bd4..94af889 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/Consumed.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/Consumed.java @@ -23,7 +23,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * An annotation to mark a method to be invoked when an entity bean has been succesfully processed + * An annotation to mark a method to be invoked when an entity bean has been successfully processed * by a Camel consumer and when the routing is done; so that it can be updated in some way to remove it from the query set. * <p/> * For example a method may be marked to set an active flag to false or to update some status value to the next step in a workflow http://git-wip-us.apache.org/repos/asf/camel/blob/7ac3001e/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java index b026073..f62da9d 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java @@ -25,11 +25,12 @@ import javax.persistence.EntityManager; * @version */ public interface DeleteHandler<T> { + /** * Deletes the entity bean after it has been processed either by actually * deleting the object or updating it in a way so that future queries do not return this object again. * - * @param entityManager + * @param entityManager the entity manager * @param entityBean the entity bean that has been processed and should be deleted */ void deleteObject(EntityManager entityManager, Object entityBean); http://git-wip-us.apache.org/repos/asf/camel/blob/7ac3001e/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index 8cfd3e8..e94c113 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -51,6 +51,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { private final TransactionTemplate transactionTemplate; private QueryFactory queryFactory; private DeleteHandler<Object> deleteHandler; + private DeleteHandler<Object> preDeleteHandler; private String query; private String namedQuery; private String nativeQuery; @@ -158,6 +159,9 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; if (lockEntity(result, entityManager)) { + // Run the @PreConsumed callback + createPreDeleteHandler().deleteObject(entityManager, result); + // process the current exchange LOG.debug("Processing exchange: {}", exchange); getProcessor().process(exchange); @@ -166,6 +170,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { throw exchange.getException(); } + // Run the @Consumed callback getDeleteHandler().deleteObject(entityManager, result); } } @@ -204,7 +209,18 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { public void setDeleteHandler(DeleteHandler<Object> deleteHandler) { this.deleteHandler = deleteHandler; } - + + public DeleteHandler<Object> getPreDeleteHandler() { + if (preDeleteHandler == null) { + preDeleteHandler = createPreDeleteHandler(); + } + return preDeleteHandler; + } + + public void setPreDeleteHandler(DeleteHandler<Object> preDeleteHandler) { + this.preDeleteHandler = preDeleteHandler; + } + public void setParameters(Map<String, Object> params) { this.parameters = params; } @@ -340,6 +356,40 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { } } + protected DeleteHandler<Object> createPreDeleteHandler() { + // Look for @PreConsumed to allow custom callback before the Entity has been consumed + Class<?> entityType = getEndpoint().getEntityType(); + if (entityType != null) { + // Inspect the method(s) annotated with @PreConsumed + List<Method> methods = ObjectHelper.findMethodsWithAnnotation(entityType, PreConsumed.class); + if (methods.size() > 1) { + throw new IllegalStateException("Only one method can be annotated with the @PreConsumed annotation but found: " + methods); + } else if (methods.size() == 1) { + // Inspect the parameters of the @PreConsumed method + Class<?>[] parameters = methods.get(0).getParameterTypes(); + if (parameters.length != 0) { + throw new IllegalStateException("@PreConsumed annotated method cannot have parameters!"); + } + + final Method method = methods.get(0); + return new DeleteHandler<Object>() { + @Override + public void deleteObject(EntityManager entityManager, Object entityBean) { + ObjectHelper.invokeMethod(method, entityBean); + } + }; + } + } + + // else do nothing + return new DeleteHandler<Object>() { + @Override + public void deleteObject(EntityManager entityManager, Object entityBean) { + // Do nothing + } + }; + } + protected DeleteHandler<Object> createDeleteHandler() { // look for @Consumed to allow custom callback when the Entity has been consumed Class<?> entityType = getEndpoint().getEntityType(); @@ -383,7 +433,6 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { query.setParameter(entry.getKey(), entry.getValue()); } } - } protected Exchange createExchange(Object result) { http://git-wip-us.apache.org/repos/asf/camel/blob/7ac3001e/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java new file mode 100644 index 0000000..5d65b13 --- /dev/null +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/PreConsumed.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jpa; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An annotation to mark a method to be invoked <b>before</b> an entity bean is processed and routed; so + * that it can be updated in such a way that the results are available to later nodes in the route. + */ +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ ElementType.METHOD }) +public @interface PreConsumed { +} http://git-wip-us.apache.org/repos/asf/camel/blob/7ac3001e/components/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java b/components/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java index 88650f1..408d68e 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java @@ -20,6 +20,10 @@ import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; +import org.apache.camel.component.jpa.PreConsumed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Represents a task which is added to the database, then removed from the database when it is consumed * @@ -27,6 +31,7 @@ import javax.persistence.Id; */ @Entity public class SendEmail { + private static final Logger LOG = LoggerFactory.getLogger(SendEmail.class); private Long id; private String address; @@ -59,4 +64,12 @@ public class SendEmail { public void setAddress(String address) { this.address = address; } + + @PreConsumed + public void doBefore() { + LOG.info("Invoked the pre consumed method with address {}", address); + if ("dummy".equals(address)) { + address = "du...@somewhere.org"; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/7ac3001e/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPreConsumedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPreConsumedTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPreConsumedTest.java new file mode 100644 index 0000000..da4db29 --- /dev/null +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPreConsumedTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.jpa; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.examples.SendEmail; +import org.apache.camel.spring.SpringRouteBuilder; +import org.junit.Test; + +/** + * @version + */ +public class JpaPreConsumedTest extends AbstractJpaTest { + protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x"; + + @Test + public void testPreConsumed() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + template.sendBody("direct:start", new SendEmail("dummy")); + + assertMockEndpointsSatisfied(); + + // @PreConsumed should change the dummy address + SendEmail email = mock.getReceivedExchanges().get(0).getIn().getBody(SendEmail.class); + assertEquals("du...@somewhere.org", email.getAddress()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new SpringRouteBuilder() { + public void configure() { + from("direct:start").to("jpa://" + SendEmail.class.getName()); + + from("jpa://" + SendEmail.class.getName()).to("mock:result"); + } + }; + } + + @Override + protected String routeXml() { + return "org/apache/camel/processor/jpa/springJpaRouteTest.xml"; + } + + @Override + protected String selectAllString() { + return SELECT_ALL_STRING; + } +} \ No newline at end of file