This is an automated email from the ASF dual-hosted git repository. zbendhiba pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus-examples.git
commit b29ce9c3e64b1bb47c55090aa69261d99e2e60ca Author: Zheng Feng <zh.f...@gmail.com> AuthorDate: Tue Nov 29 09:01:11 2022 +0800 Add jms-jpa example (#123) * Add jms-jpa example * Apply suggestions from code review Co-authored-by: James Netherton <jamesnether...@users.noreply.github.com> * fix examples.json * Upgrade quarkus-artemis to 2.0.2 * Fix with some comments Co-authored-by: James Netherton <jamesnether...@users.noreply.github.com> --- docs/modules/ROOT/attachments/examples.json | 5 + {jta-jpa => jms-jpa}/README.adoc | 98 ++----- jms-jpa/pom.xml | 320 +++++++++++++++++++++ jms-jpa/src/main/java/org/acme/AuditLog.java | 61 ++++ jms-jpa/src/main/java/org/acme/CamelRoutes.java | 73 +++++ .../src/main/java/org/acme/DummyXAResource.java | 204 +++++++++++++ .../java/org/acme/DummyXAResourceRecovery.java | 81 ++++++ jms-jpa/src/main/java/org/acme/XAJmsComponent.java | 57 ++++ jms-jpa/src/main/resources/application.properties | 54 ++++ jms-jpa/src/test/java/org/acme/JtaIT.java | 23 ++ jms-jpa/src/test/java/org/acme/JtaTest.java | 62 ++++ {jta-jpa => jms-jpa}/src/test/resources/broker.xml | 8 +- jta-jpa/README.adoc | 65 +---- 13 files changed, 975 insertions(+), 136 deletions(-) diff --git a/docs/modules/ROOT/attachments/examples.json b/docs/modules/ROOT/attachments/examples.json index d920897..73a5761 100644 --- a/docs/modules/ROOT/attachments/examples.json +++ b/docs/modules/ROOT/attachments/examples.json @@ -29,6 +29,11 @@ "description": "Shows how to use Camel health-checks with Quarkus.", "link": "https://github.com/apache/camel-quarkus-examples/tree/main/health" }, + { + "title": "JMS and JPA", + "description": "Shows how to run a Camel Quarkus application that supports JTA transactions on three external transactional resources: a database (MySQL), a messaging broker (Artemis) and a simulated XAResource which can demonstrate the commit, rollback and crash recovery.", + "link": "https://github.com/apache/camel-quarkus-examples/tree/main/jms-jpa" + }, { "title": "JTA and JPA", "description": "Shows how to run a Camel Quarkus application that supports JTA transactions on two external transactional resources: a database (MySQL) and a simulate XAResource which can demonstrate the commit, rollback and crash recovery.", diff --git a/jta-jpa/README.adoc b/jms-jpa/README.adoc similarity index 76% copy from jta-jpa/README.adoc copy to jms-jpa/README.adoc index ab5ee6b..ca814a5 100644 --- a/jta-jpa/README.adoc +++ b/jms-jpa/README.adoc @@ -1,9 +1,9 @@ -= JTA and JPA: A Camel Quarkus example -:cq-example-description: An example that shows how to run a Camel Quarkus application that supports JTA transactions on two external transactional resources: a database (MySQL) and a simulate XAResource which can demonstrate the commit, rollback and crash recovery. += JMS and JPA: A Camel Quarkus example +:cq-example-description: An example that shows how to run a Camel Quarkus application that supports JTA transactions on three external transactional resources: a database (MySQL), a messaging broker (Artemis) and a simulated XAResource which can demonstrate the commit, rollback and crash recovery. {cq-description} -We use Narayana as standalone JTA Transaction Manager implementation, and Hibernate as JPA Adapter. +We use Narayana as the standalone JTA Transaction Manager implementation, and Hibernate as the JPA Adapter. This example will connect to a database with the connection details defined in `application.properties`. If the example is run on Development mode and no database exists, Quarkus will create a matching database @@ -51,6 +51,25 @@ docker exec -it db-mysql mysql -uroot -proot -e \ FLUSH PRIVILEGES;" ---- +Create `audit_log` if it is needed: +[source, shell] +---- +docker exec -it db-mysql mysql -uadmin -padmin testdb -e \ + "CREATE TABLE audit_log ( \ + id bigint NOT NULL AUTO_INCREMENT, \ + message varchar(255) DEFAULT NULL, \ + PRIMARY KEY (id) \ + );" +---- + +Start Artemis: +[source, shell] +---- +docker run --name artemis \ + -e AMQ_USER=admin -e AMQ_PASSWORD=admin \ + -d -p 61616:61616 \ + quay.io/artemiscloud/activemq-artemis-broker +---- ==== Prerequisites @@ -67,6 +86,10 @@ $prod.quarkus.datasource.password=admin %prod.quarkus.datasource.jdbc.transactions=xa %prod.quarkus.hibernate-orm.database.generation=none + +%prod.quarkus.artemis.url=tcp://localhost:61616 +%prod.quarkus.artemis.username=admin +%prod.quarkus.artemis.password=admin ---- ==== JVM mode @@ -109,7 +132,7 @@ curl $ADDRESS/api/messages Test with normal "hello" content: [source,shell] ---- -curl -X POST $ADDRESS/api/message/hello +curl -X POST $ADDRESS/api/messages/hello ---- Check the audit_log @@ -120,13 +143,13 @@ curl $ADDRESS/api/messages You should get some results like [source] ---- -[{message=hello}] +[{message=hello}, {message=hello-ok}] ---- Test rollback by calling the service with "fail" content: [source,shell] ---- -curl -X POST $ADDRESS/api/message/fail +curl -X POST $ADDRESS/api/messages/fail ---- You should not find any trace of the message in the audit_log table. And some failures like [source] @@ -149,7 +172,7 @@ Stacktrace Test crash recovery by calling the service with "crash" content: [source,shell] ---- -curl -X POST $ADDRESS/api/message/crash +curl -X POST $ADDRESS/api/messages/crash ---- The application should be crashed, and you can not see any response. [source] @@ -167,67 +190,6 @@ Now restart the application, and wait about 10 seconds, then you can see the fol ---- check the audit_log table, you should see the message "crash" in the table. -== Running with Artemis JMS -If you want to use artemis-jms with XA support, you need to add the following dependency in `pom.xml` -[source, xml] ----- -<dependency> - <groupId>io.quarkiverse.artemis</groupId> - <artifactId>quarkus-artemis-jms</artifactId> - <version>1.2.0</version> -</dependency> -<dependency> - <groupId>io.quarkiverse.messaginghub</groupId> - <artifactId>quarkus-pooled-jms</artifactId> - <version>1.0.1</version> -</dependency> ----- - -And you need to add the following configuration in `application.properties` -[source, properties] ----- -# Quarkus Artemis and Messaginghub Pooled JMS -quarkus.artemis.url=tcp://localhost:61616 -quarkus.artemis.username=admin -quarkus.artemis.password=admin -quarkus.pooled-jms.xa.enabled=true ----- - -Start Artemis: -[source, shell] ----- -docker run --name artemis \ - -e AMQ_USER=admin -e AMQ_PASSWORD=admin \ - -d -p 61616:61616 \ - quay.io/artemiscloud/activemq-artemis-broker ----- - -Make some changes in `CamelRoutes` to use camel-quarkus-jms send and receive messages from Artemis. -[source, java] ----- -from("direct:trans") - .transacted() - .setBody(simple("${headers.message}")) - .to("bean:auditLog?method=createAuditLog(${body})") - .to("jpa:org.acme.AuditLog") - .setBody(simple("${headers.message}")) - .to("jms:outbound?disableReplyTo=true") - .choice() - .when(body().startsWith("fail")) - .log("Forced exception") - .process(x -> { - throw new RuntimeException("fail"); - }) - .otherwise() - .log("Message added: ${body}") - .endChoice(); - -from("jms:outbound") - .log("Message out: ${body}") - .to("bean:auditLog?method=createAuditLog(${body}-ok)") - .to("jpa:org.acme.AuditLog"); ----- - == Feedback Please report bugs and propose improvements via https://github.com/apache/camel-quarkus/issues[GitHub issues of Camel Quarkus] project. diff --git a/jms-jpa/pom.xml b/jms-jpa/pom.xml new file mode 100644 index 0000000..3351547 --- /dev/null +++ b/jms-jpa/pom.xml @@ -0,0 +1,320 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.camel.quarkus.examples</groupId> + <artifactId>camel-quarkus-examples-jms-jpa</artifactId> + <version>2.15.0-SNAPSHOT</version> + <name>Camel Quarkus :: Examples :: JMS JPA</name> + <description>Camel Quarkus Example :: JMS JPA</description> + <properties> + <quarkus.platform.version>2.14.0.Final</quarkus.platform.version> + <camel-quarkus.platform.version>2.15.0-SNAPSHOT</camel-quarkus.platform.version> + <quarkiverse-artemis.version>2.0.2</quarkiverse-artemis.version> + + <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id> + <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> + <camel-quarkus.platform.group-id>org.apache.camel.quarkus</camel-quarkus.platform.group-id> + <camel-quarkus.platform.artifact-id>camel-quarkus-bom</camel-quarkus.platform.artifact-id> + + <formatter-maven-plugin.version>2.17.1</formatter-maven-plugin.version> + <impsort-maven-plugin.version>1.3.2</impsort-maven-plugin.version> + <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version> + <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> + <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version> + <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <maven.compiler.testSource>${maven.compiler.source}</maven.compiler.testSource> + <maven.compiler.testTarget>${maven.compiler.target}</maven.compiler.testTarget> + <mycila-license.version>3.0</mycila-license.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>${quarkus.platform.artifact-id}</artifactId> + <version>${quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>${camel-quarkus.platform.group-id}</groupId> + <artifactId>${camel-quarkus.platform.artifact-id}</artifactId> + <version>${camel-quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>io.quarkiverse.artemis</groupId> + <artifactId>quarkus-artemis-bom</artifactId> + <version>${quarkiverse-artemis.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-microprofile-health</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-rest</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-direct</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-bean</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-jpa</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-jms</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-jta</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-narayana-jta</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-jdbc-h2</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-jdbc-mysql</artifactId> + </dependency> + <dependency> + <groupId>io.quarkiverse.messaginghub</groupId> + <artifactId>quarkus-pooled-jms</artifactId> + </dependency> + <dependency> + <groupId>io.quarkiverse.artemis</groupId> + <artifactId>quarkus-artemis-jms</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-test-h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.quarkiverse.artemis</groupId> + <artifactId>quarkus-test-artemis</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>net.revelc.code.formatter</groupId> + <artifactId>formatter-maven-plugin</artifactId> + <version>${formatter-maven-plugin.version}</version> + <configuration> + <configFile>${maven.multiModuleProjectDirectory}/eclipse-formatter-config.xml</configFile> + <lineEnding>LF</lineEnding> + </configuration> + </plugin> + <plugin> + <groupId>net.revelc.code</groupId> + <artifactId>impsort-maven-plugin</artifactId> + <version>${impsort-maven-plugin.version}</version> + <configuration> + <groups>java.,javax.,org.w3c.,org.xml.,junit.</groups> + <removeUnused>true</removeUnused> + <staticAfter>true</staticAfter> + <staticGroups>java.,javax.,org.w3c.,org.xml.,junit.</staticGroups> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin.version}</version> + <configuration> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + <compilerArgs> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <failIfNoTests>false</failIfNoTests> + <systemProperties> + <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager> + </systemProperties> + </configuration> + </plugin> + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <version>${quarkus.platform.version}</version> + </plugin> + <plugin> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + </plugin> + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <version>${mycila-license.version}</version> + <configuration> + <failIfUnknown>true</failIfUnknown> + <header>${maven.multiModuleProjectDirectory}/header.txt</header> + <excludes> + <exclude>**/*.adoc</exclude> + <exclude>**/*.csv</exclude> + <exclude>**/*.txt</exclude> + <exclude>**/LICENSE.txt</exclude> + <exclude>**/LICENSE</exclude> + <exclude>**/NOTICE.txt</exclude> + <exclude>**/NOTICE</exclude> + <exclude>**/README</exclude> + <exclude>**/pom.xml.versionsBackup</exclude> + </excludes> + <mapping> + <java>SLASHSTAR_STYLE</java> + <properties>CAMEL_PROPERTIES_STYLE</properties> + <kt>SLASHSTAR_STYLE</kt> + <xml>XML_STYLE</xml> + </mapping> + <headerDefinitions> + <headerDefinition>${maven.multiModuleProjectDirectory}/license-properties-headerdefinition.xml</headerDefinition> + </headerDefinitions> + </configuration> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <executions> + <execution> + <id>build</id> + <goals> + <goal>build</goal> + </goals> + </execution> + </executions> + <configuration> + <workingDir>${project.basedir}</workingDir> + </configuration> + </plugin> + <plugin> + <groupId>net.revelc.code.formatter</groupId> + <artifactId>formatter-maven-plugin</artifactId> + <executions> + <execution> + <id>format</id> + <phase>process-sources</phase> + <goals> + <goal>format</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>net.revelc.code</groupId> + <artifactId>impsort-maven-plugin</artifactId> + <executions> + <execution> + <id>sort-imports</id> + <phase>process-sources</phase> + <goals> + <goal>sort</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>native</id> + <activation> + <property> + <name>native</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + <configuration> + <systemPropertyVariables> + <quarkus.package.type>${quarkus.package.type}</quarkus.package.type> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <properties> + <quarkus.package.type>native</quarkus.package.type> + </properties> + </profile> + </profiles> +</project> diff --git a/jms-jpa/src/main/java/org/acme/AuditLog.java b/jms-jpa/src/main/java/org/acme/AuditLog.java new file mode 100644 index 0000000..c17365b --- /dev/null +++ b/jms-jpa/src/main/java/org/acme/AuditLog.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Named; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@Entity +@Table(name = "audit_log") +@NamedQueries({ + @NamedQuery(name = "getAuditLog", query = "select al from AuditLog al") +}) +@Named("auditLog") +@ApplicationScoped +@RegisterForReflection +public class AuditLog { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "id") + private long id; + private String message; + + public String getMessage() { + return message; + } + + public AuditLog createAuditLog(String message) { + AuditLog auditLog = new AuditLog(); + auditLog.message = message; + return auditLog; + } + + @Override + public String toString() { + return String.format("{message=%s}", message); + } +} diff --git a/jms-jpa/src/main/java/org/acme/CamelRoutes.java b/jms-jpa/src/main/java/org/acme/CamelRoutes.java new file mode 100644 index 0000000..5434c6b --- /dev/null +++ b/jms-jpa/src/main/java/org/acme/CamelRoutes.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.transaction.TransactionManager; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.rest.RestParamType; + +@ApplicationScoped +public class CamelRoutes extends RouteBuilder { + @Inject + TransactionManager transactionManager; + + @Override + public void configure() { + rest("/messages") + .produces("text/plain") + .get() + .to("direct:messages") + .post("/{message}") + .param().name("message").type(RestParamType.path).dataType("string").endParam() + .to("direct:trans"); + + from("direct:messages") + .to("jpa:org.acme.AuditLog?namedQuery=getAuditLog") + .convertBodyTo(String.class); + + from("direct:trans") + .transacted() + .setBody(simple("${headers.message}")) + .process(x -> { + DummyXAResource xaResource = new DummyXAResource("crash".equals(x.getIn().getBody(String.class))); + transactionManager.getTransaction().enlistResource(xaResource); + }) + .to("jms:outbound?disableReplyTo=true") + .to("bean:auditLog?method=createAuditLog(${body})") + .to("jpa:org.acme.AuditLog") + .setBody(simple("${headers.message}")) + .choice() + .when(body().startsWith("fail")) + .log("Forced exception") + .process(x -> { + throw new RuntimeException("fail"); + }) + .otherwise() + .log("Message added: ${body}") + .endChoice(); + + from("xajms:outbound") + .transacted() + .log("Message out: ${body}") + .to("bean:auditLog?method=createAuditLog(${body}-ok)") + .to("jpa:org.acme.AuditLog"); + + } +} diff --git a/jms-jpa/src/main/java/org/acme/DummyXAResource.java b/jms-jpa/src/main/java/org/acme/DummyXAResource.java new file mode 100644 index 0000000..fca7860 --- /dev/null +++ b/jms-jpa/src/main/java/org/acme/DummyXAResource.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import com.arjuna.ats.arjuna.common.Uid; +import org.jboss.logging.Logger; + +/** + * This class is used solely for simulating system crash. + * + */ +public class DummyXAResource implements XAResource { + private Logger LOG = Logger.getLogger(DummyXAResource.class); + + public static final String LOG_DIR = "target/DummyXAResource/"; + + private final boolean shouldCrash; + + private Xid xid; + + private File file; + + public DummyXAResource(boolean shouldCrash) { + this.shouldCrash = shouldCrash; + } + + /** + * Constructor used by recovery manager to recreate XAResource + * + * @param file File where Xid of the XAResource is stored + */ + public DummyXAResource(File file) throws IOException { + this.shouldCrash = false; + this.file = file; + this.xid = getXidFromFile(file); + } + + public int prepare(final Xid xid) throws XAException { + LOG.info("Preparing " + DummyXAResource.class.getSimpleName()); + + this.file = writeXidToFile(xid, LOG_DIR); + + return XA_OK; + } + + public void commit(final Xid xid, final boolean arg1) throws XAException { + LOG.info("Committing " + DummyXAResource.class.getSimpleName()); + + if (shouldCrash) { + LOG.info("Crashing the system"); + Runtime.getRuntime().halt(1); + } + + removeFile(file); + this.file = null; + this.xid = null; + } + + public void rollback(final Xid xid) throws XAException { + LOG.info("Rolling back " + DummyXAResource.class.getSimpleName()); + + removeFile(file); + this.file = null; + this.xid = null; + } + + public boolean isSameRM(XAResource xaResource) throws XAException { + if (!(xaResource instanceof DummyXAResource)) { + return false; + } + + DummyXAResource other = (DummyXAResource) xaResource; + + return xid != null && other.xid != null && xid.getFormatId() == other.xid.getFormatId() + && Arrays.equals(xid.getGlobalTransactionId(), other.xid.getGlobalTransactionId()) + && Arrays.equals(xid.getBranchQualifier(), other.xid.getBranchQualifier()); + } + + public Xid[] recover(int flag) throws XAException { + return new Xid[] { xid }; + } + + public void start(Xid xid, int flags) throws XAException { + + } + + public void end(Xid xid, int flags) throws XAException { + + } + + public void forget(Xid xid) throws XAException { + + } + + public int getTransactionTimeout() throws XAException { + return 0; + } + + public boolean setTransactionTimeout(final int seconds) throws XAException { + return true; + } + + private Xid getXidFromFile(File file) throws IOException { + try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) { + int formatId = inputStream.readInt(); + int globalTransactionIdLength = inputStream.readInt(); + byte[] globalTransactionId = new byte[globalTransactionIdLength]; + inputStream.read(globalTransactionId, 0, globalTransactionIdLength); + int branchQualifierLength = inputStream.readInt(); + byte[] branchQualifier = new byte[branchQualifierLength]; + inputStream.read(branchQualifier, 0, branchQualifierLength); + + return new XidImpl(formatId, globalTransactionId, branchQualifier); + } + } + + private File writeXidToFile(Xid xid, String directory) throws XAException { + File dir = new File(directory); + + if (!dir.exists() && !dir.mkdirs()) { + throw new XAException(XAException.XAER_RMERR); + } + + File file = new File(dir, new Uid().fileStringForm() + "_"); + + try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(file))) { + outputStream.writeInt(xid.getFormatId()); + outputStream.writeInt(xid.getGlobalTransactionId().length); + outputStream.write(xid.getGlobalTransactionId(), 0, xid.getGlobalTransactionId().length); + outputStream.writeInt(xid.getBranchQualifier().length); + outputStream.write(xid.getBranchQualifier(), 0, xid.getBranchQualifier().length); + outputStream.flush(); + } catch (IOException e) { + throw new XAException(XAException.XAER_RMERR); + } + + return file; + } + + private void removeFile(File file) throws XAException { + if (file != null) { + if (!file.delete()) { + throw new XAException(XAException.XA_RETRY); + } + } + } + + private class XidImpl implements Xid { + + private final int formatId; + + private final byte[] globalTransactionId; + + private final byte[] branchQualifier; + + public XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) { + this.formatId = formatId; + this.globalTransactionId = globalTransactionId; + this.branchQualifier = branchQualifier; + } + + @Override + public int getFormatId() { + return formatId; + } + + @Override + public byte[] getGlobalTransactionId() { + return globalTransactionId; + } + + @Override + public byte[] getBranchQualifier() { + return branchQualifier; + } + + } +} diff --git a/jms-jpa/src/main/java/org/acme/DummyXAResourceRecovery.java b/jms-jpa/src/main/java/org/acme/DummyXAResourceRecovery.java new file mode 100644 index 0000000..c6d3321 --- /dev/null +++ b/jms-jpa/src/main/java/org/acme/DummyXAResourceRecovery.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.transaction.xa.XAResource; + +import io.quarkus.runtime.Startup; +import org.jboss.logging.Logger; +import org.jboss.tm.XAResourceRecovery; +import org.jboss.tm.XAResourceRecoveryRegistry; + +/** + * This class is used solely for simulating system crash. + * + */ +@Startup +public class DummyXAResourceRecovery implements XAResourceRecovery { + private Logger LOG = Logger.getLogger(DummyXAResourceRecovery.class); + + @Inject + XAResourceRecoveryRegistry xaResourceRecoveryRegistry; + + @PostConstruct + void init() { + LOG.info("register DummyXAResourceRecovery"); + xaResourceRecoveryRegistry.addXAResourceRecovery(this); + } + + @Override + public XAResource[] getXAResources() throws RuntimeException { + List<DummyXAResource> resources = Collections.emptyList(); + try { + resources = getXAResourcesFromDirectory(DummyXAResource.LOG_DIR); + } catch (IOException e) { + } + + if (!resources.isEmpty()) { + LOG.info(DummyXAResourceRecovery.class.getSimpleName() + " returning list of resources: " + resources); + } + + return resources.toArray(new XAResource[] {}); + } + + private List<DummyXAResource> getXAResourcesFromDirectory(String directory) throws IOException { + List<DummyXAResource> resources = new ArrayList<>(); + + Files.newDirectoryStream(FileSystems.getDefault().getPath(directory), "*_").forEach(path -> { + try { + resources.add(new DummyXAResource(path.toFile())); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + return resources; + } + +} diff --git a/jms-jpa/src/main/java/org/acme/XAJmsComponent.java b/jms-jpa/src/main/java/org/acme/XAJmsComponent.java new file mode 100644 index 0000000..562f38e --- /dev/null +++ b/jms-jpa/src/main/java/org/acme/XAJmsComponent.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Produces; +import javax.inject.Inject; +import javax.inject.Named; +import javax.jms.ConnectionFactory; +import javax.transaction.TransactionManager; +import javax.transaction.TransactionSynchronizationRegistry; +import javax.transaction.UserTransaction; + +import org.apache.camel.component.jms.JmsComponent; +import org.springframework.transaction.jta.JtaTransactionManager; + +@Dependent +public class XAJmsComponent { + @Inject + ConnectionFactory connectionFactory; + + @Inject + TransactionManager transactionManager; + + @Inject + TransactionSynchronizationRegistry transactionSynchronizationRegistry; + + @Inject + UserTransaction userTransaction; + + @Produces + @Named("xajms") + public JmsComponent xaJms() { + JmsComponent component = new JmsComponent(); + component.setConnectionFactory(connectionFactory); + component.setTransacted(false); + + JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager); + jtaTransactionManager.setTransactionSynchronizationRegistry(transactionSynchronizationRegistry); + component.setTransactionManager(jtaTransactionManager); + return component; + } +} diff --git a/jms-jpa/src/main/resources/application.properties b/jms-jpa/src/main/resources/application.properties new file mode 100644 index 0000000..300ddf1 --- /dev/null +++ b/jms-jpa/src/main/resources/application.properties @@ -0,0 +1,54 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +# +# Quarkus +# +quarkus.banner.enabled = false +quarkus.log.file.enable = true + +# Default Datasource +quarkus.datasource.db-kind=h2 +quarkus.datasource.jdbc.max-size=8 +quarkus.datasource.jdbc.transactions=xa + +# Production Datasource +#%prod.quarkus.datasource.db-kind=mysql +#%prod.quarkus.datasource.username=admin +#%prod.quarkus.datasource.password=admin +#%prod.quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/testdb +#%prod.quarkus.datasource.jdbc.transactions=xa + +# Hibernate ORM +quarkus.hibernate-orm.database.generation=drop-and-create +#%prod.quarkus.hibernate-orm.database.generation=none + +# Quarkus Narayana JTA +quarkus.transaction-manager.object-store-directory=target/narayana +quarkus.transaction-manager.enable-recovery=true + +# Camel +camel.rest.context-path=/api + +# Quarkus Artemis +quarkus.artemis.enabled=true +%test.quarkus.artemis.devservices.enabled=false +#%prod.quarkus.artemis.url=tcp://localhost:61616 +#%prod.quarkus.artemis.username=admin +#%prod.quarkus.artemis.password=admin + +# Quarkus MessagingHub Pooled JMS +quarkus.pooled-jms.xa.enabled=true diff --git a/jms-jpa/src/test/java/org/acme/JtaIT.java b/jms-jpa/src/test/java/org/acme/JtaIT.java new file mode 100644 index 0000000..a3d7f98 --- /dev/null +++ b/jms-jpa/src/test/java/org/acme/JtaIT.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class JtaIT extends JtaTest { +} diff --git a/jms-jpa/src/test/java/org/acme/JtaTest.java b/jms-jpa/src/test/java/org/acme/JtaTest.java new file mode 100644 index 0000000..a92281c --- /dev/null +++ b/jms-jpa/src/test/java/org/acme/JtaTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import io.quarkus.artemis.test.ArtemisTestResource; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.h2.H2DatabaseTestResource; +import io.quarkus.test.junit.QuarkusTest; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.is; + +@QuarkusTest +@QuarkusTestResource(H2DatabaseTestResource.class) +@QuarkusTestResource(ArtemisTestResource.class) +public class JtaTest { + @Test + public void testXA() { + String body = UUID.randomUUID().toString(); + + given().when().post("/api/messages/" + body) + .then() + .statusCode(200); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until( + () -> given().when().get("/api/messages") + .then().statusCode(200).extract().asString().contains(body + "-ok")); + } + + @Test + public void testRollback() { + String result = given().when().get("/api/messages").asString(); + + given().when().post("/api/messages/fail") + .then() + .statusCode(500); + + given().when().get("/api/messages") + .then() + .statusCode(200) + .body(is(result)); + } +} diff --git a/jta-jpa/src/test/resources/broker.xml b/jms-jpa/src/test/resources/broker.xml similarity index 88% rename from jta-jpa/src/test/resources/broker.xml rename to jms-jpa/src/test/resources/broker.xml index 3f6e095..274a4dd 100644 --- a/jta-jpa/src/test/resources/broker.xml +++ b/jms-jpa/src/test/resources/broker.xml @@ -24,19 +24,19 @@ <large-messages-directory>./target/artemis/large-messages</large-messages-directory> <connectors> - <connector name="activemq">tcp://localhost:61616</connector> + <connector name="activemq">tcp://localhost:61617</connector> </connectors> <acceptors> - <acceptor name="activemq">tcp://localhost:61616</acceptor> + <acceptor name="activemq">tcp://localhost:61617</acceptor> </acceptors> <max-disk-usage>-1</max-disk-usage> <security-enabled>false</security-enabled> <addresses> - <address name="test-jms"> + <address name="outbound"> <anycast> - <queue name="test-jms"/> + <queue name="outbound"/> </anycast> </address> </addresses> diff --git a/jta-jpa/README.adoc b/jta-jpa/README.adoc index ab5ee6b..b357429 100644 --- a/jta-jpa/README.adoc +++ b/jta-jpa/README.adoc @@ -51,11 +51,9 @@ docker exec -it db-mysql mysql -uroot -proot -e \ FLUSH PRIVILEGES;" ---- - - ==== Prerequisites - Make sure `io.quarkus:quarkus-jdbc-mysql` has been added in `pom.xml` -- Make sure `db-mysql` and `artemis` has been started and ready for servicing +- Make sure `db-mysql` has been started and ready for servicing - Edit `src/main/resource/application.properties` to uncomment all `%prod` lines [source, properties] ---- @@ -167,67 +165,6 @@ Now restart the application, and wait about 10 seconds, then you can see the fol ---- check the audit_log table, you should see the message "crash" in the table. -== Running with Artemis JMS -If you want to use artemis-jms with XA support, you need to add the following dependency in `pom.xml` -[source, xml] ----- -<dependency> - <groupId>io.quarkiverse.artemis</groupId> - <artifactId>quarkus-artemis-jms</artifactId> - <version>1.2.0</version> -</dependency> -<dependency> - <groupId>io.quarkiverse.messaginghub</groupId> - <artifactId>quarkus-pooled-jms</artifactId> - <version>1.0.1</version> -</dependency> ----- - -And you need to add the following configuration in `application.properties` -[source, properties] ----- -# Quarkus Artemis and Messaginghub Pooled JMS -quarkus.artemis.url=tcp://localhost:61616 -quarkus.artemis.username=admin -quarkus.artemis.password=admin -quarkus.pooled-jms.xa.enabled=true ----- - -Start Artemis: -[source, shell] ----- -docker run --name artemis \ - -e AMQ_USER=admin -e AMQ_PASSWORD=admin \ - -d -p 61616:61616 \ - quay.io/artemiscloud/activemq-artemis-broker ----- - -Make some changes in `CamelRoutes` to use camel-quarkus-jms send and receive messages from Artemis. -[source, java] ----- -from("direct:trans") - .transacted() - .setBody(simple("${headers.message}")) - .to("bean:auditLog?method=createAuditLog(${body})") - .to("jpa:org.acme.AuditLog") - .setBody(simple("${headers.message}")) - .to("jms:outbound?disableReplyTo=true") - .choice() - .when(body().startsWith("fail")) - .log("Forced exception") - .process(x -> { - throw new RuntimeException("fail"); - }) - .otherwise() - .log("Message added: ${body}") - .endChoice(); - -from("jms:outbound") - .log("Message out: ${body}") - .to("bean:auditLog?method=createAuditLog(${body}-ok)") - .to("jpa:org.acme.AuditLog"); ----- - == Feedback Please report bugs and propose improvements via https://github.com/apache/camel-quarkus/issues[GitHub issues of Camel Quarkus] project.