This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch 3.8.x in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/3.8.x by this push: new 74b847a350 Jt400: tests are not cleaning after themselves and parallel run fails 74b847a350 is described below commit 74b847a350c7bd018e4f4e881bfa2d61523d13eb Author: JiriOndrusek <ondrusek.j...@gmail.com> AuthorDate: Mon Apr 15 13:07:33 2024 +0200 Jt400: tests are not cleaning after themselves and parallel run fails --- integration-tests/jt400/README.adoc | 24 +- integration-tests/jt400/pom.xml | 5 + .../component/jt400/it/InquiryMessageHolder.java | 43 +++ .../quarkus/component/jt400/it/Jt400Resource.java | 97 ++++-- .../quarkus/component/jt400/it/Jt400Routes.java | 22 +- .../src/main/resources/application.properties | 2 +- .../quarkus/component/jt400/it/Jt400Test.java | 245 +++++++------- .../component/jt400/it/Jt400TestResource.java | 352 +++++++++++++++++++++ 8 files changed, 645 insertions(+), 145 deletions(-) diff --git a/integration-tests/jt400/README.adoc b/integration-tests/jt400/README.adoc index 0fa4e22359..241dbf8c77 100644 --- a/integration-tests/jt400/README.adoc +++ b/integration-tests/jt400/README.adoc @@ -110,4 +110,26 @@ $Env:JT400_KEYED_QUEUE="#lkeyedqueue_if_not_TESTKEYED.DTAQ" $Env:JT400_MESSAGE_QUEUE="#messagequeue_if_not_TESTMSGQ.MSGQ" $Env:JT400_MESSAGE_REPLYTO_QUEUE="#messagequeueinquiry_if_not_REPLYMSGQ.MSGQ" $Env:JT400_USER_SPACE="#userspace_if_not_PROGCALL" -``` \ No newline at end of file +``` + +=== Clear queues after unexpected failures + +If tests finishes without unexpected failure, tests are taking care of clearing the data. +In some cases data might stay written into the real server if test fails unexpectedly. +This state should might alter following executions. + +To force full clear (of each queue) can be achieved by add ing parameter +``` +-Dcq.jt400.clear-all=true +``` +Be aware that with `-Dcq.jt400.clear-all=true`, the tests can not successfully finish in parallel run. + +Usage of clear queues parameter is *strongly* suggested during development + + +==== Parallel runs and locking + +Simple locking mechanism is implemented for the test to allow parallel executions. + +Whenever test is started, new entry is written into keyed data queue `JT400_KEYED_QUEUE` with the key `cq.jt400.global-lock` and entry is removed after the run. +Tests are able to clear this lock even if previous execution fails unexpectedly. \ No newline at end of file diff --git a/integration-tests/jt400/pom.xml b/integration-tests/jt400/pom.xml index 8d8f352c70..22e1efd5fe 100644 --- a/integration-tests/jt400/pom.xml +++ b/integration-tests/jt400/pom.xml @@ -71,6 +71,11 @@ <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> diff --git a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java new file mode 100644 index 0000000000..18030cc163 --- /dev/null +++ b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.jt400.it; + +import jakarta.inject.Singleton; + +@Singleton +public class InquiryMessageHolder { + + private String messageText; + + private boolean processed = false; + + public String getMessageText() { + return messageText; + } + + public void setMessageText(String messageText) { + this.messageText = messageText; + } + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } +} diff --git a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java index c65f268978..9fd65df78c 100644 --- a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java +++ b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java @@ -16,19 +16,19 @@ */ package org.apache.camel.quarkus.component.jt400.it; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import com.ibm.as400.access.AS400; -import com.ibm.as400.access.MessageQueue; import com.ibm.as400.access.QueuedMessage; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; @@ -80,6 +80,9 @@ public class Jt400Resource { @Inject CamelContext context; + @Inject + InquiryMessageHolder inquiryMessageHolder; + @Path("/dataQueue/read/") @POST @Produces(MediaType.APPLICATION_JSON) @@ -101,7 +104,7 @@ public class Jt400Resource { Exchange ex = consumerTemplate.receive(getUrlForLibrary(suffix.toString())); if ("binary".equals(format)) { - return generateResponse(new String(ex.getIn().getBody(byte[].class), Charset.forName("Cp037")), ex); + return generateResponse(new String(ex.getIn().getBody(byte[].class), StandardCharsets.UTF_8), ex); } return generateResponse(ex.getIn().getBody(String.class), ex); @@ -112,61 +115,85 @@ public class Jt400Resource { @Consumes(MediaType.TEXT_PLAIN) @Produces(MediaType.TEXT_PLAIN) public Response keyedDataQueueWrite(@QueryParam("key") String key, - @QueryParam("searchType") String searchType, + @QueryParam("format") String format, String data) { + String _format = Optional.ofNullable(format).orElse("text"); boolean keyed = key != null; StringBuilder suffix = new StringBuilder(); Map<String, Object> headers = new HashMap<>(); + String msg; if (keyed) { - suffix.append(jt400KeyedQueue).append("?keyed=true"); + suffix.append(jt400KeyedQueue).append("?keyed=true").append("&format=").append(_format); headers.put(Jt400Endpoint.KEY, key); + msg = "Hello From KDQ: " + data; } else { - suffix.append(jt400LifoQueue); + suffix.append(jt400LifoQueue).append("?format=").append(_format); + msg = "Hello From DQ: " + data; } - Object ex = producerTemplate.requestBodyAndHeaders( - getUrlForLibrary(suffix.toString()), - "Hello " + data, - headers); - return Response.ok().entity(ex).build(); + Object retVal; + if ("binary".equals(format)) { + byte[] result = (byte[]) producerTemplate.requestBodyAndHeaders( + getUrlForLibrary(suffix.toString()), + ("Hello (bin) " + data).getBytes(StandardCharsets.UTF_8), + headers); + retVal = new String(result, StandardCharsets.UTF_8); + } else { + retVal = producerTemplate.requestBodyAndHeaders( + getUrlForLibrary(suffix.toString()), + msg, + headers); + } + + return Response.ok().entity(retVal).build(); } - @Path("/client/inquiryMessage/write/") - @POST + @Path("/route/start/{route}") + @GET @Produces(MediaType.TEXT_PLAIN) - public Response clientInquiryMessageWrite(String data) throws Exception { - Jt400Endpoint jt400Endpoint = context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue), Jt400Endpoint.class); - AS400 as400 = jt400Endpoint.getConfiguration().getConnection(); - //send inquiry message (with the same client as is used in the component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another job`. - MessageQueue queue = new MessageQueue(as400, jt400Endpoint.getConfiguration().getObjectPath()); - try { - queue.sendInquiry(data, "/QSYS.LIB/" + jt400Library + ".LIB/" + jt400MessageReplyToQueue); - } catch (Exception e) { - return Response.status(500).entity(e.getMessage()).build(); + public Response startRoute(@PathParam("route") String routeName) throws Exception { + if (context.getRouteController().getRouteStatus(routeName).isStartable()) { + context.getRouteController().startRoute(routeName); } - return Response.ok().build(); + + return Response.ok().entity(context.getRouteController().getRouteStatus(routeName).isStarted()).build(); } - @Path("/client/queuedMessage/read") - @POST + @Path("/route/stop/{route}") + @GET @Produces(MediaType.TEXT_PLAIN) - public Response clientQueuedMessageRead(String queueName) throws Exception { + public Response stopRoute(@PathParam("route") String routeName) throws Exception { + if (context.getRouteController().getRouteStatus(routeName).isStoppable()) { + context.getRouteController().stopRoute(routeName); + } + boolean resp = context.getRouteController().getRouteStatus(routeName).isStopped(); + + //stop component to avoid CPF2451 Message queue REPLYMSGQ is allocated to another job. + Jt400Endpoint jt400Endpoint = context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue), Jt400Endpoint.class); + jt400Endpoint.close(); - Jt400Endpoint jt400Endpoint = context.getEndpoint(getUrlForLibrary(queueName), Jt400Endpoint.class); - AS400 as400 = jt400Endpoint.getConfiguration().getConnection(); - //send inquiry message (with the same client as is used in the component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another job`. - MessageQueue queue = new MessageQueue(as400, jt400Endpoint.getConfiguration().getObjectPath()); - QueuedMessage message = queue.receive(null); + return Response.ok().entity(resp).build(); + } - return Response.ok().entity(message != null ? message.getText() : "").build(); + @Path("/inquiryMessageSetExpected") + @POST + public void inquiryMessageSetExpected(String msg) { + inquiryMessageHolder.setMessageText(msg); + } + + @Path("/inquiryMessageProcessed") + @GET + @Produces(MediaType.TEXT_PLAIN) + public String inquiryMessageProcessed() { + return String.valueOf(inquiryMessageHolder.isProcessed()); } @Path("/messageQueue/write/") @POST @Produces(MediaType.TEXT_PLAIN) public Response messageQueueWrite(String data) { - Object ex = producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello " + data); + Object ex = producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello from MQ: " + data); return Response.ok().entity(ex).build(); } @@ -176,7 +203,7 @@ public class Jt400Resource { @Produces(MediaType.APPLICATION_JSON) public Response messageQueueRead(@QueryParam("queue") String queue) { Exchange ex = consumerTemplate - .receive(getUrlForLibrary(queue == null ? jt400MessageQueue : queue)); + .receive(getUrlForLibrary(queue == null ? jt400MessageQueue : queue) + "?messageAction=SAME"); return generateResponse(ex.getIn().getBody(String.class), ex); } diff --git a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java index 5615454c86..a3ce493919 100644 --- a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java +++ b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java @@ -18,12 +18,15 @@ package org.apache.camel.quarkus.component.jt400.it; import com.ibm.as400.access.AS400Message; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jt400.Jt400Constants; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; @ApplicationScoped public class Jt400Routes extends RouteBuilder { + private static final Logger LOGGER = Logger.getLogger(Jt400Routes.class); @ConfigProperty(name = "cq.jt400.library") String jt400Library; @@ -40,16 +43,31 @@ public class Jt400Routes extends RouteBuilder { @ConfigProperty(name = "cq.jt400.message-replyto-queue") String jt400MessageReplyToQueue; + @Inject + InquiryMessageHolder inquiryMessageHolder; + @Override public void configure() throws Exception { from(getUrlForLibrary(jt400MessageReplyToQueue + "?sendingReply=true")) + .id("inquiryRoute") + //route has tobe stopped to avoid "CPF2451 Message queue REPLYMSGQ is allocated to another job." + .autoStartup(false) .choice() .when(header(Jt400Constants.MESSAGE_TYPE).isEqualTo(AS400Message.INQUIRY)) .process((exchange) -> { - String reply = "reply to: " + exchange.getIn().getBody(String.class); + String msg = exchange.getIn().getBody(String.class); + LOGGER.debug( + "Inquiry route: received '" + msg + "' (expecting '" + inquiryMessageHolder.getMessageText() + + "')"); + if (inquiryMessageHolder.getMessageText() != null && !inquiryMessageHolder.getMessageText().equals(msg)) { + throw new IllegalStateException( + "Intentional! Current exchange is not triggered by current test process, therefore ignoring the exchange"); + } + String reply = "reply to: " + msg; exchange.getIn().setBody(reply); }) - .to(getUrlForLibrary(jt400MessageReplyToQueue)); + .to(getUrlForLibrary(jt400MessageReplyToQueue)) + .process(e -> inquiryMessageHolder.setProcessed(true)); } private String getUrlForLibrary(String suffix) { diff --git a/integration-tests/jt400/src/main/resources/application.properties b/integration-tests/jt400/src/main/resources/application.properties index db47b32aab..296c10e1a0 100644 --- a/integration-tests/jt400/src/main/resources/application.properties +++ b/integration-tests/jt400/src/main/resources/application.properties @@ -31,4 +31,4 @@ cq.jt400.user-space=${JT400_USER_SPACE:PROGCALL} cq.jt400.message-queue=${JT400_MESSAGE_QUEUE:TESTMSGQ.MSGQ} cq.jt400.message-replyto-queue=${JT400_MESSAGE_REPLYTO_QUEUE:REPLYMSGQ.MSGQ} cq.jt400.keyed-queue=${JT400_KEYED_QUEUE:TESTKEYED.DTAQ} -cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ} +cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ} \ No newline at end of file diff --git a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java index c1168b7644..3627dd198a 100644 --- a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java +++ b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java @@ -16,117 +16,117 @@ */ package org.apache.camel.quarkus.component.jt400.it; -import java.io.IOException; import java.util.Locale; -import java.util.function.BiFunction; - -import com.ibm.as400.access.AS400; -import com.ibm.as400.access.AS400SecurityException; -import com.ibm.as400.access.ErrorCompletingRequestException; -import com.ibm.as400.access.KeyedDataQueue; -import com.ibm.as400.access.MessageQueue; -import com.ibm.as400.access.ObjectDoesNotExistException; +import java.util.concurrent.TimeUnit; + +import com.ibm.as400.access.QueuedMessage; +import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; import org.apache.camel.component.jt400.Jt400Constants; import org.apache.commons.lang3.RandomStringUtils; -import org.eclipse.microprofile.config.ConfigProvider; +import org.awaitility.Awaitility; import org.hamcrest.Matchers; +import org.jboss.logging.Logger; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; @QuarkusTest @EnabledIfEnvironmentVariable(named = "JT400_URL", matches = ".+") +@QuarkusTestResource(Jt400TestResource.class) public class Jt400Test { + private static final Logger LOGGER = Logger.getLogger(Jt400Test.class); + + private final int MSG_LENGTH = 20; + //tests may be executed in parallel, therefore the timeout is a little bigger in case the test has to wait for another one + private final int WAIT_IN_SECONDS = 20; @BeforeAll public static void beforeAll() throws Exception { - //read all messages from the queues to be sure that they are empty - - //clear reply-to message queue - clearQueue("cq.jt400.message-replyto-queue", - (as400, path) -> { - try { - return new MessageQueue(as400, path).receive(null); - } catch (Exception e) { - return null; - } - }); - - //clear message queue - clearQueue("cq.jt400.message-queue", - (as400, path) -> { - try { - return new MessageQueue(as400, path).receive(null); - } catch (Exception e) { - return null; - } - }); - - //clear keyed queue for key1 - clearQueue("cq.jt400.message-queue", - (as400, path) -> { - try { - return new KeyedDataQueue(as400, path).read("key1"); - } catch (Exception e) { - return null; - } - }); - - //clear keyed queue for key2 - clearQueue("cq.jt400.message-queue", - (as400, path) -> { - try { - return new KeyedDataQueue(as400, path).read("key1"); - } catch (Exception e) { - return null; - } - }); + //for development purposes + // logQueues(); + + //lock execution + Jt400TestResource.CLIENT_HELPER.lock(); + } + + @AfterAll + public static void afterAll() throws Exception { + getClientHelper().unlock(); + } + + private static void logQueues() throws Exception { + StringBuilder sb = new StringBuilder("\n"); + sb.append("**********************************************************"); + sb.append(getClientHelper().dumpQueues()); + sb.append("\n**********************************************************\n"); + LOGGER.info(sb.toString()); } @Test public void testDataQueue() { - String msg = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); + LOGGER.debug("** testDataQueue() ** has started "); + + String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); + String answer = "Hello From DQ: " + msg; RestAssured.given() .body(msg) .post("/jt400/dataQueue/write") .then() .statusCode(200) - .body(Matchers.equalTo("Hello " + msg)); + .body(Matchers.equalTo(answer)); + + LOGGER.debug("testDataQueue: message '" + answer + "' was written. "); + + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.lifoQueueu, answer); RestAssured.post("/jt400/dataQueue/read") .then() .statusCode(200) - .body("result", Matchers.equalTo("Hello " + msg)); + .body("result", Matchers.equalTo(answer)); } @Test - public void testDataQueueBinary() { - String msg = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); + public void testDataQueueBinary() throws Exception { + LOGGER.debug("** testDataQueueBinary() ** has started "); + String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); + String answer = "Hello (bin) " + msg; RestAssured.given() .body(msg) + .queryParam("format", "binary") .post("/jt400/dataQueue/write") .then() .statusCode(200) - .body(Matchers.equalTo("Hello " + msg)); + .body(Matchers.equalTo(answer)); + + LOGGER.debug("testDataQueueBinary: message '" + answer + "' was written. "); + + //register to delete + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.lifoQueueu, answer); RestAssured.given() .queryParam("format", "binary") .post("/jt400/dataQueue/read") .then() .statusCode(200) - .body("result", Matchers.equalTo("Hello " + msg)); + .body("result", Matchers.equalTo(answer)); } @Test public void testKeyedDataQueue() { - String msg1 = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); - String msg2 = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); - String key1 = "key1"; - String key2 = "key2"; + LOGGER.debug("** testKeyedDataQueue() ** has started "); + String msg1 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); + String msg2 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); + String answer1 = "Hello From KDQ: " + msg1; + String answer2 = "Hello From KDQ: " + msg2; + + String key1 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH - 1).toLowerCase(Locale.ROOT); + //key2 is right after key1 + String key2 = key1 + "a"; RestAssured.given() .body(msg1) @@ -134,79 +134,131 @@ public class Jt400Test { .post("/jt400/dataQueue/write/") .then() .statusCode(200) - .body(Matchers.equalTo("Hello " + msg1)); + .body(Matchers.equalTo(answer1)); + + LOGGER.debug("testKeyedDataQueue: message '" + answer1 + " (key " + key1 + ") was written. "); + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.keyedDataQue, key1); RestAssured.given() - .body("Sheldon2") + .body(msg2) .queryParam("key", key2) .post("/jt400/dataQueue/write/") .then() .statusCode(200) - .body(Matchers.equalTo("Hello Sheldon2")); + .body(Matchers.equalTo(answer2)); + + LOGGER.debug("testKeyedDataQueue: message '" + answer2 + " (key " + key2 + ") was written. "); + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.keyedDataQue, key2); RestAssured.given() .body(key1) .post("/jt400/dataQueue/read/") .then() .statusCode(200) - .body("result", Matchers.equalTo("Hello " + msg1)) + .body("result", Matchers.equalTo(answer1)) .body(Jt400Constants.KEY, Matchers.equalTo(key1)); RestAssured.given() .body(key1) - .queryParam("searchType", "NE") + .queryParam("searchType", "GE") .post("/jt400/dataQueue/read/") .then() .statusCode(200) - .body("result", Matchers.not(Matchers.equalTo("Hello " + msg2))) + .body("result", Matchers.not(Matchers.equalTo(answer1))) .body(Jt400Constants.KEY, Matchers.equalTo(key2)); } @Test - public void testMessageQueue() throws AS400SecurityException, ObjectDoesNotExistException, IOException, - InterruptedException, ErrorCompletingRequestException { - String msg = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); + public void testMessageQueue() throws Exception { + LOGGER.debug("** testMessageQueue() ** has started "); + //write + String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); + String answer = "Hello from MQ: " + msg; RestAssured.given() .body(msg) .post("/jt400/messageQueue/write") .then() .statusCode(200) - .body(Matchers.equalTo("Hello " + msg)); + .body(Matchers.equalTo(answer)); + + LOGGER.debug("testMessageQueue: message '" + answer + "' was written. "); + //register to delete + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.messageQueue, answer); + + //read (the read message might be different in case the test runs in parallel RestAssured.post("/jt400/messageQueue/read") .then() .statusCode(200) - .body("result", Matchers.is("Hello " + msg)) //check of headers .body(Jt400Constants.SENDER_INFORMATION, Matchers.not(Matchers.empty())) .body(Jt400Constants.MESSAGE_FILE, Matchers.is("")) .body(Jt400Constants.MESSAGE_SEVERITY, Matchers.is(0)) .body(Jt400Constants.MESSAGE_ID, Matchers.is("")) .body(Jt400Constants.MESSAGE_TYPE, Matchers.is(4)) - .body(Jt400Constants.MESSAGE, Matchers.is("QueuedMessage: Hello " + msg)); + .body(Jt400Constants.MESSAGE, Matchers.startsWith("QueuedMessage: Hello ")) + .body("result", Matchers.equalTo(answer)); //Jt400Constants.MESSAGE_DFT_RPY && Jt400Constants.MESSAGE_REPLYTO_KEY are used only for a special // type of message which can not be created by the camel component (*INQUIRY) } @Test - public void testInquiryMessageQueue() throws AS400SecurityException, ObjectDoesNotExistException, IOException, - InterruptedException, ErrorCompletingRequestException { + public void testInquiryMessageQueue() throws Exception { + LOGGER.debug("** testInquiryMessageQueue() **: has started "); String msg = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); + String replyMsg = "reply to: " + msg; + + LOGGER.debug("testInquiryMessageQueue: writing " + msg); //sending a message using the same client as component - RestAssured.given() - .body(msg) - .post("/jt400/client/inquiryMessage/write") - .then() - .statusCode(200); + getClientHelper().sendInquiry(msg); + //register deletion of the message in case some following task fails + QueuedMessage queuedMessage = getClientHelper().peekReplyToQueueMessage(msg); + if (queuedMessage != null) { + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, queuedMessage.getKey()); + LOGGER.debug("testInquiryMessageQueue: message confirmed by peek: " + msg); + } + + //set filter for expected messages (for parallel executions) RestAssured.given() - .body(ConfigProvider.getConfig().getValue("cq.jt400.message-replyto-queue", String.class)) - .post("/jt400/client/queuedMessage/read") + .body(msg) + .post("/jt400/inquiryMessageSetExpected") .then() - .statusCode(200) - .body(Matchers.equalTo("reply to: " + msg)); + .statusCode(204); + //start route before sending message (and wait for start) + Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until( + () -> RestAssured.get("/jt400/route/start/inquiryRoute") + .then() + .statusCode(200) + .extract().asString(), + Matchers.is(Boolean.TRUE.toString())); + LOGGER.debug("testInquiryMessageQueue: inquiry route started"); + + //await to be processed + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).until( + () -> RestAssured.get("/jt400/inquiryMessageProcessed") + .then() + .statusCode(200) + .extract().asString(), + Matchers.is(String.valueOf(Boolean.TRUE))); + LOGGER.debug("testInquiryMessageQueue: inquiry message processed"); + + //stop route (and wait for stop) + Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until( + () -> RestAssured.get("/jt400/route/stop/inquiryRoute") + .then() + .statusCode(200) + .extract().asString(), + Matchers.is(Boolean.TRUE.toString())); + LOGGER.debug("testInquiryMessageQueue: inquiry route stooped"); + + //check written message with client + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).until( + () -> getClientHelper().peekReplyToQueueMessage(replyMsg), + Matchers.notNullValue()); + LOGGER.debug("testInquiryMessageQueue: reply message confirmed by peek: " + replyMsg); } @Test @@ -219,27 +271,8 @@ public class Jt400Test { .body(Matchers.containsString("hello camel")); } - private static void clearQueue(String queue, BiFunction<AS400, String, Object> readFromQueue) { - String jt400Url = ConfigProvider.getConfig().getValue("cq.jt400.url", String.class); - String jt400Username = ConfigProvider.getConfig().getValue("cq.jt400.username", String.class); - String jt400Password = ConfigProvider.getConfig().getValue("cq.jt400.password", String.class); - String jt400Library = ConfigProvider.getConfig().getValue("cq.jt400.library", String.class); - String jt400MessageQueue = ConfigProvider.getConfig().getValue(queue, String.class); - - String objectPath = String.format("/QSYS.LIB/%s.LIB/%s", jt400Library, jt400MessageQueue); - - AS400 as400 = new AS400(jt400Url, jt400Username, jt400Password); - - int i = 0; - Object msg = null; - //read messages until null is received - do { - msg = readFromQueue.apply(as400, objectPath); - } while (i++ < 10 && msg != null); - - if (i == 10 && msg != null) { - throw new IllegalStateException("There is a message present in a queue!"); - } + private static Jt400ClientHelper getClientHelper() { + return Jt400TestResource.CLIENT_HELPER; } } diff --git a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java new file mode 100644 index 0000000000..3c09d0f562 --- /dev/null +++ b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.jt400.it; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.ibm.as400.access.AS400; +import com.ibm.as400.access.AS400SecurityException; +import com.ibm.as400.access.DataQueue; +import com.ibm.as400.access.DataQueueEntry; +import com.ibm.as400.access.ErrorCompletingRequestException; +import com.ibm.as400.access.KeyedDataQueue; +import com.ibm.as400.access.KeyedDataQueueEntry; +import com.ibm.as400.access.MessageQueue; +import com.ibm.as400.access.ObjectDoesNotExistException; +import com.ibm.as400.access.QueuedMessage; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import org.apache.commons.lang3.RandomStringUtils; +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.ConfigProvider; +import org.hamcrest.Matchers; +import org.jboss.logging.Logger; +import org.junit.jupiter.api.Assertions; + +public class Jt400TestResource implements QuarkusTestResourceLifecycleManager { + private static final Logger LOGGER = Logger.getLogger(Jt400TestResource.class); + + public static enum RESOURCE_TYPE { + messageQueue, + keyedDataQue, + lifoQueueu, + replyToQueueu; + } + + private static final Optional<String> JT400_CLEAR_ALL = ConfigProvider.getConfig().getOptionalValue("cq.jt400.clear-all", + String.class); + private static final String JT400_URL = ConfigProvider.getConfig().getValue("cq.jt400.url", String.class); + private static final String JT400_USERNAME = ConfigProvider.getConfig().getValue("cq.jt400.username", String.class); + private static final String JT400_PASSWORD = ConfigProvider.getConfig().getValue("cq.jt400.password", String.class); + private static final String JT400_LIBRARY = ConfigProvider.getConfig().getValue("cq.jt400.library", String.class); + private static final String JT400_MESSAGE_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.message-queue", + String.class); + private static final String JT400_REPLY_TO_MESSAGE_QUEUE = ConfigProvider.getConfig().getValue( + "cq.jt400.message-replyto-queue", + String.class); + private static final String JT400_LIFO_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.lifo-queue", + String.class); + private static final String JT400_KEYED_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.keyed-queue", String.class); + + //depth of repetitive reads for lifo queue clearing + private final static int CLEAR_DEPTH = 100; + public final static String LOCK_KEY = "cq.jt400.global-lock"; + //5 minute timeout to obtain a log for the tests execution + private final static int LOCK_TIMEOUT = 300000; + + private static AS400 as400 = new AS400(JT400_URL, JT400_USERNAME, JT400_PASSWORD);; + + @Override + public Map<String, String> start() { + //no need to start, as the instance already exists + return Collections.emptyMap(); + } + + @Override + public void stop() { + if (as400 != null) { + try { + CLIENT_HELPER.clearAll(JT400_CLEAR_ALL.isPresent() && Boolean.parseBoolean(JT400_CLEAR_ALL.get())); + } catch (Exception e) { + LOGGER.debug("Clearing of the external queues failed", e); + } + as400.close(); + } + } + + private static String getObjectPath(String object) { + return String.format("/QSYS.LIB/%s.LIB/%s", JT400_LIBRARY, object); + } + + public static Jt400ClientHelper CLIENT_HELPER = new Jt400ClientHelper() { + + private String key = null; + Map<RESOURCE_TYPE, Set<Object>> toRemove = new HashMap<>(); + + @Override + public QueuedMessage peekReplyToQueueMessage(String msg) throws Exception { + return getQueueMessage(JT400_REPLY_TO_MESSAGE_QUEUE, msg); + } + + private QueuedMessage getQueueMessage(String queue, String msg) throws Exception { + MessageQueue messageQueue = new MessageQueue(as400, + getObjectPath(queue)); + Enumeration<QueuedMessage> msgs = messageQueue.getMessages(); + + while (msgs.hasMoreElements()) { + QueuedMessage queuedMessage = msgs.nextElement(); + + if (msg.equals(queuedMessage.getText())) { + return queuedMessage; + } + } + return null; + } + + @Override + public void registerForRemoval(RESOURCE_TYPE type, Object value) { + if (toRemove.containsKey(type)) { + toRemove.get(type).add(value); + } else { + Set<Object> set = new HashSet<>(); + set.add(value); + toRemove.put(type, set); + } + } + + @Override + public void clearAll(boolean all) throws Exception { + //message queue + MessageQueue mq = new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)); + if (all) { + mq.remove(); + } else if (toRemove.containsKey(RESOURCE_TYPE.messageQueue)) { + clearMessageQueue(RESOURCE_TYPE.messageQueue, mq); + } + + //lifo queue + DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE)); + if (all) { + for (int i = 01; i < CLEAR_DEPTH; i++) { + if (dq.read() == null) { + break; + } + } + } else if (toRemove.containsKey(RESOURCE_TYPE.lifoQueueu)) { + for (Object entry : toRemove.get(RESOURCE_TYPE.lifoQueueu)) { + List<byte[]> otherMessages = new LinkedList<>(); + DataQueueEntry dqe = dq.read(); + while (dqe != null && !(entry.equals(dqe.getString()) + || entry.equals(new String(dqe.getData(), StandardCharsets.UTF_8)))) { + otherMessages.add(dqe.getData()); + dqe = dq.read(); + } + //write back other messages in reverse order (it is a lifo) + Collections.reverse(otherMessages); + for (byte[] msg : otherMessages) { + dq.write(msg); + } + } + } + //reply-to queue + MessageQueue rq = new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)); + if (all) { + rq.remove(); + } else if (toRemove.containsKey(RESOURCE_TYPE.replyToQueueu)) { + clearMessageQueue(RESOURCE_TYPE.replyToQueueu, rq); + } + + //keyed queue + KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)); + if (all) { + kdq.clear(); + } else if (toRemove.containsKey(RESOURCE_TYPE.keyedDataQue)) { + for (Object entry : toRemove.get(RESOURCE_TYPE.keyedDataQue)) { + kdq.clear((String) entry); + } + } + } + + private void clearMessageQueue(RESOURCE_TYPE type, MessageQueue mq) throws AS400SecurityException, + ErrorCompletingRequestException, InterruptedException, IOException, ObjectDoesNotExistException { + if (!toRemove.get(type).isEmpty()) { + List<QueuedMessage> msgs = Collections.list(mq.getMessages()); + Map<String, byte[]> keys = msgs.stream().collect(Collectors.toMap(q -> q.getText(), q -> q.getKey())); + for (Object entry : toRemove.get(type)) { + if (entry instanceof String) { + mq.remove(keys.get((String) entry)); + } else { + mq.remove((byte[]) entry); + } + } + } + } + + /** + * Keyed dataque (FIFO) is used for locking purposes. + * + * - Each participant saves unique token into a key cq.jt400.global-lock + * - Each participant the reads the FIFO queue and if the resulted string is its own unique token, execution is allowed + * - When execution ends, the key is removed + * + * If the token is not its own + * -read of the token is repeated until timeout or its own token is returned (so the second participant waits, until the + * first participant removes its token) + * + * Dead lock prevention + * + * - part of the unique token is timestamp, if participant finds a token, which is too old, token is removed + * - action to clear-all data removes also the locking tokens + * + * + * Therefore only 1 token (thus 1 participant) is allowed to run the tests, the others have to wait + * + * @throws Exception + */ + @Override + public void lock() throws Exception { + if (key == null) { + key = generateKey(); + //write key into keyed queue + KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)); + + Assertions.assertTrue(kdq.isFIFO(), "keyed dataqueue has to be FIFO"); + + kdq.write(LOCK_KEY, key); + + //added 5 seconds for the timeout, to have some spare time for removing old locks + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(LOCK_TIMEOUT + 5000, TimeUnit.SECONDS) + .until( + () -> { + KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY); + if (kdqe == null) { + //if kdqe is null, try to lock again + LOGGER.debug("locked in the queueu was removed, locking again with " + key); + kdq.write(LOCK_KEY, key); + } + String peekedKey = kdqe == null ? null : kdqe.getString(); + //if waiting takes more than 300s, check whether the actual lock can be removed + LOGGER.debug("peeked lock " + peekedKey + "(my lock is " + key + ")"); + + if (peekedKey != null && !key.equals(peekedKey)) { + long peekedTime = Long.parseLong(peekedKey.substring(11)); + if (System.currentTimeMillis() - peekedTime > LOCK_TIMEOUT) { + //read the key (therefore remove it) + String readKey = kdq.read(LOCK_KEY).getString(); + System.out.println("Removed old lock " + readKey); + peekedKey = kdq.peek(LOCK_KEY).getString(); + } + } + return peekedKey; + }, + Matchers.is(key)); + } + } + + @Override + public void unlock() throws Exception { + Assertions.assertEquals(key, + new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)).read(LOCK_KEY).getString()); + //clear key + key = null; + } + + private String generateKey() { + return RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT) + ":" + System.currentTimeMillis(); + } + + @Override + public String dumpQueues() throws Exception { + StringBuilder sb = new StringBuilder(); + + sb.append("\n* MESSAGE QUEUE\n"); + sb.append("\t" + Collections.list(new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).getMessages()) + .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", "))); + + sb.append("\n* INQUIRY QUEUE\n"); + sb.append("\t" + Collections + .list(new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages()) + .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", "))); + + sb.append("\n* LIFO QUEUE\n"); + DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE)); + DataQueueEntry dqe; + List<byte[]> lifoMessages = new LinkedList<>(); + List<String> lifoTexts = new LinkedList<>(); + do { + dqe = dq.read(); + if (dqe != null) { + lifoTexts.add(dqe.getString() + " (" + new String(dqe.getData(), StandardCharsets.UTF_8) + ")"); + lifoMessages.add(dqe.getData()); + } + } while (dqe != null); + + //write back other messages in reverse order (it is a lifo) + Collections.reverse(lifoMessages); + for (byte[] msg : lifoMessages) { + dq.write(msg); + } + sb.append(lifoTexts.stream().collect(Collectors.joining(", "))); + + sb.append("\n* KEYED DATA QUEUE\n"); + KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)); + KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY); + sb.append("\tlock: " + (kdqe == null ? "null" : kdqe.getString())); + return sb.toString(); + } + + public void sendInquiry(String msg) throws Exception { + new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg, + getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)); + } + }; + +} + +interface Jt400ClientHelper { + + void registerForRemoval(Jt400TestResource.RESOURCE_TYPE type, Object value); + + QueuedMessage peekReplyToQueueMessage(String msg) throws Exception; + + void sendInquiry(String msg) throws Exception; + + //------------------- clear listeners ------------------------------ + + void clearAll(boolean all) throws Exception; + + //----------------------- locking + + void lock() throws Exception; + + void unlock() throws Exception; + + String dumpQueues() throws Exception; + +}