This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/3.8.x by this push:
     new 74b847a350 Jt400: tests are not cleaning after themselves and parallel 
run fails
74b847a350 is described below

commit 74b847a350c7bd018e4f4e881bfa2d61523d13eb
Author: JiriOndrusek <ondrusek.j...@gmail.com>
AuthorDate: Mon Apr 15 13:07:33 2024 +0200

    Jt400: tests are not cleaning after themselves and parallel run fails
---
 integration-tests/jt400/README.adoc                |  24 +-
 integration-tests/jt400/pom.xml                    |   5 +
 .../component/jt400/it/InquiryMessageHolder.java   |  43 +++
 .../quarkus/component/jt400/it/Jt400Resource.java  |  97 ++++--
 .../quarkus/component/jt400/it/Jt400Routes.java    |  22 +-
 .../src/main/resources/application.properties      |   2 +-
 .../quarkus/component/jt400/it/Jt400Test.java      | 245 +++++++-------
 .../component/jt400/it/Jt400TestResource.java      | 352 +++++++++++++++++++++
 8 files changed, 645 insertions(+), 145 deletions(-)

diff --git a/integration-tests/jt400/README.adoc 
b/integration-tests/jt400/README.adoc
index 0fa4e22359..241dbf8c77 100644
--- a/integration-tests/jt400/README.adoc
+++ b/integration-tests/jt400/README.adoc
@@ -110,4 +110,26 @@ $Env:JT400_KEYED_QUEUE="#lkeyedqueue_if_not_TESTKEYED.DTAQ"
 $Env:JT400_MESSAGE_QUEUE="#messagequeue_if_not_TESTMSGQ.MSGQ"
 $Env:JT400_MESSAGE_REPLYTO_QUEUE="#messagequeueinquiry_if_not_REPLYMSGQ.MSGQ"
 $Env:JT400_USER_SPACE="#userspace_if_not_PROGCALL"
-```
\ No newline at end of file
+```
+
+=== Clear queues after unexpected failures
+
+If tests finishes without unexpected failure, tests are taking care of 
clearing the data.
+In some cases data might stay written into the real server if test fails 
unexpectedly.
+This state should might alter following executions.
+
+To force full clear (of each queue) can be achieved by add ing parameter
+```
+-Dcq.jt400.clear-all=true
+```
+Be aware that with `-Dcq.jt400.clear-all=true`, the tests can not successfully 
finish in parallel run.
+
+Usage of clear queues parameter is *strongly* suggested during development
+
+
+==== Parallel runs and locking
+
+Simple locking mechanism is implemented for the test to allow parallel 
executions.
+
+Whenever test is started, new entry is  written into keyed data queue 
`JT400_KEYED_QUEUE` with the key `cq.jt400.global-lock` and entry is removed 
after the run.
+Tests are able to clear this lock even if previous execution fails 
unexpectedly.
\ No newline at end of file
diff --git a/integration-tests/jt400/pom.xml b/integration-tests/jt400/pom.xml
index 8d8f352c70..22e1efd5fe 100644
--- a/integration-tests/jt400/pom.xml
+++ b/integration-tests/jt400/pom.xml
@@ -71,6 +71,11 @@
             <artifactId>rest-assured</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java
 
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java
new file mode 100644
index 0000000000..18030cc163
--- /dev/null
+++ 
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jt400.it;
+
+import jakarta.inject.Singleton;
+
+@Singleton
+public class InquiryMessageHolder {
+
+    private String messageText;
+
+    private boolean processed = false;
+
+    public String getMessageText() {
+        return messageText;
+    }
+
+    public void setMessageText(String messageText) {
+        this.messageText = messageText;
+    }
+
+    public boolean isProcessed() {
+        return processed;
+    }
+
+    public void setProcessed(boolean processed) {
+        this.processed = processed;
+    }
+}
diff --git 
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
 
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
index c65f268978..9fd65df78c 100644
--- 
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
+++ 
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
@@ -16,19 +16,19 @@
  */
 package org.apache.camel.quarkus.component.jt400.it;
 
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import com.ibm.as400.access.AS400;
-import com.ibm.as400.access.MessageQueue;
 import com.ibm.as400.access.QueuedMessage;
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.GET;
 import jakarta.ws.rs.POST;
 import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.QueryParam;
 import jakarta.ws.rs.core.MediaType;
@@ -80,6 +80,9 @@ public class Jt400Resource {
     @Inject
     CamelContext context;
 
+    @Inject
+    InquiryMessageHolder inquiryMessageHolder;
+
     @Path("/dataQueue/read/")
     @POST
     @Produces(MediaType.APPLICATION_JSON)
@@ -101,7 +104,7 @@ public class Jt400Resource {
         Exchange ex = 
consumerTemplate.receive(getUrlForLibrary(suffix.toString()));
 
         if ("binary".equals(format)) {
-            return generateResponse(new 
String(ex.getIn().getBody(byte[].class), Charset.forName("Cp037")), ex);
+            return generateResponse(new 
String(ex.getIn().getBody(byte[].class), StandardCharsets.UTF_8), ex);
         }
         return generateResponse(ex.getIn().getBody(String.class), ex);
 
@@ -112,61 +115,85 @@ public class Jt400Resource {
     @Consumes(MediaType.TEXT_PLAIN)
     @Produces(MediaType.TEXT_PLAIN)
     public Response keyedDataQueueWrite(@QueryParam("key") String key,
-            @QueryParam("searchType") String searchType,
+            @QueryParam("format") String format,
             String data) {
+        String _format = Optional.ofNullable(format).orElse("text");
         boolean keyed = key != null;
         StringBuilder suffix = new StringBuilder();
         Map<String, Object> headers = new HashMap<>();
+        String msg;
 
         if (keyed) {
-            suffix.append(jt400KeyedQueue).append("?keyed=true");
+            
suffix.append(jt400KeyedQueue).append("?keyed=true").append("&format=").append(_format);
             headers.put(Jt400Endpoint.KEY, key);
+            msg = "Hello From KDQ: " + data;
         } else {
-            suffix.append(jt400LifoQueue);
+            suffix.append(jt400LifoQueue).append("?format=").append(_format);
+            msg = "Hello From DQ: " + data;
         }
 
-        Object ex = producerTemplate.requestBodyAndHeaders(
-                getUrlForLibrary(suffix.toString()),
-                "Hello " + data,
-                headers);
-        return Response.ok().entity(ex).build();
+        Object retVal;
+        if ("binary".equals(format)) {
+            byte[] result = (byte[]) producerTemplate.requestBodyAndHeaders(
+                    getUrlForLibrary(suffix.toString()),
+                    ("Hello (bin) " + data).getBytes(StandardCharsets.UTF_8),
+                    headers);
+            retVal = new String(result, StandardCharsets.UTF_8);
+        } else {
+            retVal = producerTemplate.requestBodyAndHeaders(
+                    getUrlForLibrary(suffix.toString()),
+                    msg,
+                    headers);
+        }
+
+        return Response.ok().entity(retVal).build();
     }
 
-    @Path("/client/inquiryMessage/write/")
-    @POST
+    @Path("/route/start/{route}")
+    @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response clientInquiryMessageWrite(String data) throws Exception {
-        Jt400Endpoint jt400Endpoint = 
context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue), 
Jt400Endpoint.class);
-        AS400 as400 = jt400Endpoint.getConfiguration().getConnection();
-        //send inquiry message (with the same client as is used in the 
component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another 
job`.
-        MessageQueue queue = new MessageQueue(as400, 
jt400Endpoint.getConfiguration().getObjectPath());
-        try {
-            queue.sendInquiry(data, "/QSYS.LIB/" + jt400Library + ".LIB/" + 
jt400MessageReplyToQueue);
-        } catch (Exception e) {
-            return Response.status(500).entity(e.getMessage()).build();
+    public Response startRoute(@PathParam("route") String routeName) throws 
Exception {
+        if 
(context.getRouteController().getRouteStatus(routeName).isStartable()) {
+            context.getRouteController().startRoute(routeName);
         }
-        return Response.ok().build();
+
+        return 
Response.ok().entity(context.getRouteController().getRouteStatus(routeName).isStarted()).build();
     }
 
-    @Path("/client/queuedMessage/read")
-    @POST
+    @Path("/route/stop/{route}")
+    @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response clientQueuedMessageRead(String queueName) throws Exception 
{
+    public Response stopRoute(@PathParam("route") String routeName) throws 
Exception {
+        if 
(context.getRouteController().getRouteStatus(routeName).isStoppable()) {
+            context.getRouteController().stopRoute(routeName);
+        }
+        boolean resp = 
context.getRouteController().getRouteStatus(routeName).isStopped();
+
+        //stop component to avoid CPF2451 Message queue REPLYMSGQ is allocated 
to another job.
+        Jt400Endpoint jt400Endpoint = 
context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue), 
Jt400Endpoint.class);
+        jt400Endpoint.close();
 
-        Jt400Endpoint jt400Endpoint = 
context.getEndpoint(getUrlForLibrary(queueName), Jt400Endpoint.class);
-        AS400 as400 = jt400Endpoint.getConfiguration().getConnection();
-        //send inquiry message (with the same client as is used in the 
component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another 
job`.
-        MessageQueue queue = new MessageQueue(as400, 
jt400Endpoint.getConfiguration().getObjectPath());
-        QueuedMessage message = queue.receive(null);
+        return Response.ok().entity(resp).build();
+    }
 
-        return Response.ok().entity(message != null ? message.getText() : 
"").build();
+    @Path("/inquiryMessageSetExpected")
+    @POST
+    public void inquiryMessageSetExpected(String msg) {
+        inquiryMessageHolder.setMessageText(msg);
+    }
+
+    @Path("/inquiryMessageProcessed")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public String inquiryMessageProcessed() {
+        return String.valueOf(inquiryMessageHolder.isProcessed());
     }
 
     @Path("/messageQueue/write/")
     @POST
     @Produces(MediaType.TEXT_PLAIN)
     public Response messageQueueWrite(String data) {
-        Object ex = 
producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello " + 
data);
+        Object ex = 
producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello from 
MQ: " + data);
 
         return Response.ok().entity(ex).build();
     }
@@ -176,7 +203,7 @@ public class Jt400Resource {
     @Produces(MediaType.APPLICATION_JSON)
     public Response messageQueueRead(@QueryParam("queue") String queue) {
         Exchange ex = consumerTemplate
-                .receive(getUrlForLibrary(queue == null ? jt400MessageQueue : 
queue));
+                .receive(getUrlForLibrary(queue == null ? jt400MessageQueue : 
queue) + "?messageAction=SAME");
 
         return generateResponse(ex.getIn().getBody(String.class), ex);
     }
diff --git 
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
 
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
index 5615454c86..a3ce493919 100644
--- 
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
+++ 
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
@@ -18,12 +18,15 @@ package org.apache.camel.quarkus.component.jt400.it;
 
 import com.ibm.as400.access.AS400Message;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.jt400.Jt400Constants;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
 
 @ApplicationScoped
 public class Jt400Routes extends RouteBuilder {
+    private static final Logger LOGGER = Logger.getLogger(Jt400Routes.class);
 
     @ConfigProperty(name = "cq.jt400.library")
     String jt400Library;
@@ -40,16 +43,31 @@ public class Jt400Routes extends RouteBuilder {
     @ConfigProperty(name = "cq.jt400.message-replyto-queue")
     String jt400MessageReplyToQueue;
 
+    @Inject
+    InquiryMessageHolder inquiryMessageHolder;
+
     @Override
     public void configure() throws Exception {
         from(getUrlForLibrary(jt400MessageReplyToQueue + "?sendingReply=true"))
+                .id("inquiryRoute")
+                //route has tobe stopped to avoid "CPF2451 Message queue 
REPLYMSGQ is allocated to another job."
+                .autoStartup(false)
                 .choice()
                 
.when(header(Jt400Constants.MESSAGE_TYPE).isEqualTo(AS400Message.INQUIRY))
                 .process((exchange) -> {
-                    String reply = "reply to: " + 
exchange.getIn().getBody(String.class);
+                    String msg = exchange.getIn().getBody(String.class);
+                    LOGGER.debug(
+                            "Inquiry route: received '" + msg + "' (expecting  
'" + inquiryMessageHolder.getMessageText()
+                                    + "')");
+                    if (inquiryMessageHolder.getMessageText() != null && 
!inquiryMessageHolder.getMessageText().equals(msg)) {
+                        throw new IllegalStateException(
+                                "Intentional! Current exchange is not 
triggered by current test process, therefore ignoring the exchange");
+                    }
+                    String reply = "reply to: " + msg;
                     exchange.getIn().setBody(reply);
                 })
-                .to(getUrlForLibrary(jt400MessageReplyToQueue));
+                .to(getUrlForLibrary(jt400MessageReplyToQueue))
+                .process(e -> inquiryMessageHolder.setProcessed(true));
     }
 
     private String getUrlForLibrary(String suffix) {
diff --git a/integration-tests/jt400/src/main/resources/application.properties 
b/integration-tests/jt400/src/main/resources/application.properties
index db47b32aab..296c10e1a0 100644
--- a/integration-tests/jt400/src/main/resources/application.properties
+++ b/integration-tests/jt400/src/main/resources/application.properties
@@ -31,4 +31,4 @@ cq.jt400.user-space=${JT400_USER_SPACE:PROGCALL}
 cq.jt400.message-queue=${JT400_MESSAGE_QUEUE:TESTMSGQ.MSGQ}
 cq.jt400.message-replyto-queue=${JT400_MESSAGE_REPLYTO_QUEUE:REPLYMSGQ.MSGQ}
 cq.jt400.keyed-queue=${JT400_KEYED_QUEUE:TESTKEYED.DTAQ}
-cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
+cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
\ No newline at end of file
diff --git 
a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
 
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
index c1168b7644..3627dd198a 100644
--- 
a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
+++ 
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
@@ -16,117 +16,117 @@
  */
 package org.apache.camel.quarkus.component.jt400.it;
 
-import java.io.IOException;
 import java.util.Locale;
-import java.util.function.BiFunction;
-
-import com.ibm.as400.access.AS400;
-import com.ibm.as400.access.AS400SecurityException;
-import com.ibm.as400.access.ErrorCompletingRequestException;
-import com.ibm.as400.access.KeyedDataQueue;
-import com.ibm.as400.access.MessageQueue;
-import com.ibm.as400.access.ObjectDoesNotExistException;
+import java.util.concurrent.TimeUnit;
+
+import com.ibm.as400.access.QueuedMessage;
+import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import org.apache.camel.component.jt400.Jt400Constants;
 import org.apache.commons.lang3.RandomStringUtils;
-import org.eclipse.microprofile.config.ConfigProvider;
+import org.awaitility.Awaitility;
 import org.hamcrest.Matchers;
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
 
 @QuarkusTest
 @EnabledIfEnvironmentVariable(named = "JT400_URL", matches = ".+")
+@QuarkusTestResource(Jt400TestResource.class)
 public class Jt400Test {
+    private static final Logger LOGGER = Logger.getLogger(Jt400Test.class);
+
+    private final int MSG_LENGTH = 20;
+    //tests may be executed in parallel, therefore the timeout is a little 
bigger in case the test has to wait for another one
+    private final int WAIT_IN_SECONDS = 20;
 
     @BeforeAll
     public static void beforeAll() throws Exception {
-        //read all messages from the queues to be sure that they are empty
-
-        //clear reply-to message queue
-        clearQueue("cq.jt400.message-replyto-queue",
-                (as400, path) -> {
-                    try {
-                        return new MessageQueue(as400, path).receive(null);
-                    } catch (Exception e) {
-                        return null;
-                    }
-                });
-
-        //clear  message queue
-        clearQueue("cq.jt400.message-queue",
-                (as400, path) -> {
-                    try {
-                        return new MessageQueue(as400, path).receive(null);
-                    } catch (Exception e) {
-                        return null;
-                    }
-                });
-
-        //clear  keyed queue for key1
-        clearQueue("cq.jt400.message-queue",
-                (as400, path) -> {
-                    try {
-                        return new KeyedDataQueue(as400, path).read("key1");
-                    } catch (Exception e) {
-                        return null;
-                    }
-                });
-
-        //clear  keyed queue for key2
-        clearQueue("cq.jt400.message-queue",
-                (as400, path) -> {
-                    try {
-                        return new KeyedDataQueue(as400, path).read("key1");
-                    } catch (Exception e) {
-                        return null;
-                    }
-                });
+        //for development purposes
+        //        logQueues();
+
+        //lock execution
+        Jt400TestResource.CLIENT_HELPER.lock();
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        getClientHelper().unlock();
+    }
+
+    private static void logQueues() throws Exception {
+        StringBuilder sb = new StringBuilder("\n");
+        
sb.append("**********************************************************");
+        sb.append(getClientHelper().dumpQueues());
+        
sb.append("\n**********************************************************\n");
+        LOGGER.info(sb.toString());
     }
 
     @Test
     public void testDataQueue() {
-        String msg = 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+        LOGGER.debug("** testDataQueue() ** has started ");
+
+        String msg = 
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+        String answer = "Hello From DQ: " + msg;
 
         RestAssured.given()
                 .body(msg)
                 .post("/jt400/dataQueue/write")
                 .then()
                 .statusCode(200)
-                .body(Matchers.equalTo("Hello " + msg));
+                .body(Matchers.equalTo(answer));
+
+        LOGGER.debug("testDataQueue: message '" + answer + "' was written. ");
+
+        
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.lifoQueueu,
 answer);
 
         RestAssured.post("/jt400/dataQueue/read")
                 .then()
                 .statusCode(200)
-                .body("result", Matchers.equalTo("Hello " + msg));
+                .body("result", Matchers.equalTo(answer));
     }
 
     @Test
-    public void testDataQueueBinary() {
-        String msg = 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+    public void testDataQueueBinary() throws Exception {
+        LOGGER.debug("** testDataQueueBinary() ** has started ");
+        String msg = 
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+        String answer = "Hello (bin) " + msg;
 
         RestAssured.given()
                 .body(msg)
+                .queryParam("format", "binary")
                 .post("/jt400/dataQueue/write")
                 .then()
                 .statusCode(200)
-                .body(Matchers.equalTo("Hello " + msg));
+                .body(Matchers.equalTo(answer));
+
+        LOGGER.debug("testDataQueueBinary: message '" + answer + "' was 
written. ");
+
+        //register to delete
+        
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.lifoQueueu,
 answer);
 
         RestAssured.given()
                 .queryParam("format", "binary")
                 .post("/jt400/dataQueue/read")
                 .then()
                 .statusCode(200)
-                .body("result", Matchers.equalTo("Hello " + msg));
+                .body("result", Matchers.equalTo(answer));
     }
 
     @Test
     public void testKeyedDataQueue() {
-        String msg1 = 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
-        String msg2 = 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
-        String key1 = "key1";
-        String key2 = "key2";
+        LOGGER.debug("** testKeyedDataQueue() ** has started ");
+        String msg1 = 
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+        String msg2 = 
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+        String answer1 = "Hello From KDQ: " + msg1;
+        String answer2 = "Hello From KDQ: " + msg2;
+
+        String key1 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH - 
1).toLowerCase(Locale.ROOT);
+        //key2 is right after key1
+        String key2 = key1 + "a";
 
         RestAssured.given()
                 .body(msg1)
@@ -134,79 +134,131 @@ public class Jt400Test {
                 .post("/jt400/dataQueue/write/")
                 .then()
                 .statusCode(200)
-                .body(Matchers.equalTo("Hello " + msg1));
+                .body(Matchers.equalTo(answer1));
+
+        LOGGER.debug("testKeyedDataQueue: message '" + answer1 + " (key " + 
key1 + ") was written. ");
+        
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.keyedDataQue,
 key1);
 
         RestAssured.given()
-                .body("Sheldon2")
+                .body(msg2)
                 .queryParam("key", key2)
                 .post("/jt400/dataQueue/write/")
                 .then()
                 .statusCode(200)
-                .body(Matchers.equalTo("Hello Sheldon2"));
+                .body(Matchers.equalTo(answer2));
+
+        LOGGER.debug("testKeyedDataQueue: message '" + answer2 + " (key " + 
key2 + ") was written. ");
+        
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.keyedDataQue,
 key2);
 
         RestAssured.given()
                 .body(key1)
                 .post("/jt400/dataQueue/read/")
                 .then()
                 .statusCode(200)
-                .body("result", Matchers.equalTo("Hello " + msg1))
+                .body("result", Matchers.equalTo(answer1))
                 .body(Jt400Constants.KEY, Matchers.equalTo(key1));
 
         RestAssured.given()
                 .body(key1)
-                .queryParam("searchType", "NE")
+                .queryParam("searchType", "GE")
                 .post("/jt400/dataQueue/read/")
                 .then()
                 .statusCode(200)
-                .body("result", Matchers.not(Matchers.equalTo("Hello " + 
msg2)))
+                .body("result", Matchers.not(Matchers.equalTo(answer1)))
                 .body(Jt400Constants.KEY, Matchers.equalTo(key2));
     }
 
     @Test
-    public void testMessageQueue() throws AS400SecurityException, 
ObjectDoesNotExistException, IOException,
-            InterruptedException, ErrorCompletingRequestException {
-        String msg = 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+    public void testMessageQueue() throws Exception {
+        LOGGER.debug("** testMessageQueue() ** has started ");
+        //write
+        String msg = 
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+        String answer = "Hello from MQ: " + msg;
 
         RestAssured.given()
                 .body(msg)
                 .post("/jt400/messageQueue/write")
                 .then()
                 .statusCode(200)
-                .body(Matchers.equalTo("Hello " + msg));
+                .body(Matchers.equalTo(answer));
+
+        LOGGER.debug("testMessageQueue: message '" + answer + "' was written. 
");
+        //register to delete
+        
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.messageQueue,
 answer);
+
+        //read (the read message might be different in case the test runs in 
parallel
 
         RestAssured.post("/jt400/messageQueue/read")
                 .then()
                 .statusCode(200)
-                .body("result", Matchers.is("Hello " + msg))
                 //check of headers
                 .body(Jt400Constants.SENDER_INFORMATION, 
Matchers.not(Matchers.empty()))
                 .body(Jt400Constants.MESSAGE_FILE, Matchers.is(""))
                 .body(Jt400Constants.MESSAGE_SEVERITY, Matchers.is(0))
                 .body(Jt400Constants.MESSAGE_ID, Matchers.is(""))
                 .body(Jt400Constants.MESSAGE_TYPE, Matchers.is(4))
-                .body(Jt400Constants.MESSAGE, Matchers.is("QueuedMessage: 
Hello " + msg));
+                .body(Jt400Constants.MESSAGE, 
Matchers.startsWith("QueuedMessage: Hello "))
+                .body("result", Matchers.equalTo(answer));
         //Jt400Constants.MESSAGE_DFT_RPY && Jt400Constants.MESSAGE_REPLYTO_KEY 
are used only for a special
         // type of message which can not be created by the camel component 
(*INQUIRY)
     }
 
     @Test
-    public void testInquiryMessageQueue() throws AS400SecurityException, 
ObjectDoesNotExistException, IOException,
-            InterruptedException, ErrorCompletingRequestException {
+    public void testInquiryMessageQueue() throws Exception {
+        LOGGER.debug("** testInquiryMessageQueue() **: has started ");
         String msg = 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+        String replyMsg = "reply to: " + msg;
+
+        LOGGER.debug("testInquiryMessageQueue: writing " + msg);
 
         //sending a message using the same client as component
-        RestAssured.given()
-                .body(msg)
-                .post("/jt400/client/inquiryMessage/write")
-                .then()
-                .statusCode(200);
+        getClientHelper().sendInquiry(msg);
 
+        //register deletion of the message in case some following task fails
+        QueuedMessage queuedMessage = 
getClientHelper().peekReplyToQueueMessage(msg);
+        if (queuedMessage != null) {
+            
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu,
 queuedMessage.getKey());
+            LOGGER.debug("testInquiryMessageQueue: message confirmed by peek: 
" + msg);
+        }
+
+        //set filter for expected messages (for parallel executions)
         RestAssured.given()
-                
.body(ConfigProvider.getConfig().getValue("cq.jt400.message-replyto-queue", 
String.class))
-                .post("/jt400/client/queuedMessage/read")
+                .body(msg)
+                .post("/jt400/inquiryMessageSetExpected")
                 .then()
-                .statusCode(200)
-                .body(Matchers.equalTo("reply to: " + msg));
+                .statusCode(204);
+        //start route before sending message (and wait for start)
+        Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
+                () -> RestAssured.get("/jt400/route/start/inquiryRoute")
+                        .then()
+                        .statusCode(200)
+                        .extract().asString(),
+                Matchers.is(Boolean.TRUE.toString()));
+        LOGGER.debug("testInquiryMessageQueue: inquiry route started");
+
+        //await to be processed
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20, 
TimeUnit.SECONDS).until(
+                () -> RestAssured.get("/jt400/inquiryMessageProcessed")
+                        .then()
+                        .statusCode(200)
+                        .extract().asString(),
+                Matchers.is(String.valueOf(Boolean.TRUE)));
+        LOGGER.debug("testInquiryMessageQueue: inquiry message processed");
+
+        //stop route (and wait for stop)
+        Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
+                () -> RestAssured.get("/jt400/route/stop/inquiryRoute")
+                        .then()
+                        .statusCode(200)
+                        .extract().asString(),
+                Matchers.is(Boolean.TRUE.toString()));
+        LOGGER.debug("testInquiryMessageQueue: inquiry route stooped");
+
+        //check written message with client
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20, 
TimeUnit.SECONDS).until(
+                () -> getClientHelper().peekReplyToQueueMessage(replyMsg),
+                Matchers.notNullValue());
+        LOGGER.debug("testInquiryMessageQueue: reply message confirmed by 
peek: " + replyMsg);
     }
 
     @Test
@@ -219,27 +271,8 @@ public class Jt400Test {
                 .body(Matchers.containsString("hello camel"));
     }
 
-    private static void clearQueue(String queue, BiFunction<AS400, String, 
Object> readFromQueue) {
-        String jt400Url = ConfigProvider.getConfig().getValue("cq.jt400.url", 
String.class);
-        String jt400Username = 
ConfigProvider.getConfig().getValue("cq.jt400.username", String.class);
-        String jt400Password = 
ConfigProvider.getConfig().getValue("cq.jt400.password", String.class);
-        String jt400Library = 
ConfigProvider.getConfig().getValue("cq.jt400.library", String.class);
-        String jt400MessageQueue = ConfigProvider.getConfig().getValue(queue, 
String.class);
-
-        String objectPath = String.format("/QSYS.LIB/%s.LIB/%s", jt400Library, 
jt400MessageQueue);
-
-        AS400 as400 = new AS400(jt400Url, jt400Username, jt400Password);
-
-        int i = 0;
-        Object msg = null;
-        //read messages until null is received
-        do {
-            msg = readFromQueue.apply(as400, objectPath);
-        } while (i++ < 10 && msg != null);
-
-        if (i == 10 && msg != null) {
-            throw new IllegalStateException("There is a message present in a 
queue!");
-        }
+    private static Jt400ClientHelper getClientHelper() {
+        return Jt400TestResource.CLIENT_HELPER;
     }
 
 }
diff --git 
a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
 
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
new file mode 100644
index 0000000000..3c09d0f562
--- /dev/null
+++ 
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jt400.it;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.ibm.as400.access.AS400;
+import com.ibm.as400.access.AS400SecurityException;
+import com.ibm.as400.access.DataQueue;
+import com.ibm.as400.access.DataQueueEntry;
+import com.ibm.as400.access.ErrorCompletingRequestException;
+import com.ibm.as400.access.KeyedDataQueue;
+import com.ibm.as400.access.KeyedDataQueueEntry;
+import com.ibm.as400.access.MessageQueue;
+import com.ibm.as400.access.ObjectDoesNotExistException;
+import com.ibm.as400.access.QueuedMessage;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.hamcrest.Matchers;
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.Assertions;
+
+public class Jt400TestResource implements QuarkusTestResourceLifecycleManager {
+    private static final Logger LOGGER = 
Logger.getLogger(Jt400TestResource.class);
+
+    public static enum RESOURCE_TYPE {
+        messageQueue,
+        keyedDataQue,
+        lifoQueueu,
+        replyToQueueu;
+    }
+
+    private static final Optional<String> JT400_CLEAR_ALL = 
ConfigProvider.getConfig().getOptionalValue("cq.jt400.clear-all",
+            String.class);
+    private static final String JT400_URL = 
ConfigProvider.getConfig().getValue("cq.jt400.url", String.class);
+    private static final String JT400_USERNAME = 
ConfigProvider.getConfig().getValue("cq.jt400.username", String.class);
+    private static final String JT400_PASSWORD = 
ConfigProvider.getConfig().getValue("cq.jt400.password", String.class);
+    private static final String JT400_LIBRARY = 
ConfigProvider.getConfig().getValue("cq.jt400.library", String.class);
+    private static final String JT400_MESSAGE_QUEUE = 
ConfigProvider.getConfig().getValue("cq.jt400.message-queue",
+            String.class);
+    private static final String JT400_REPLY_TO_MESSAGE_QUEUE = 
ConfigProvider.getConfig().getValue(
+            "cq.jt400.message-replyto-queue",
+            String.class);
+    private static final String JT400_LIFO_QUEUE = 
ConfigProvider.getConfig().getValue("cq.jt400.lifo-queue",
+            String.class);
+    private static final String JT400_KEYED_QUEUE = 
ConfigProvider.getConfig().getValue("cq.jt400.keyed-queue", String.class);
+
+    //depth of repetitive reads for lifo queue clearing
+    private final static int CLEAR_DEPTH = 100;
+    public final static String LOCK_KEY = "cq.jt400.global-lock";
+    //5 minute timeout to obtain a log for the tests execution
+    private final static int LOCK_TIMEOUT = 300000;
+
+    private static AS400 as400 = new AS400(JT400_URL, JT400_USERNAME, 
JT400_PASSWORD);;
+
+    @Override
+    public Map<String, String> start() {
+        //no need to start, as the instance already exists
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void stop() {
+        if (as400 != null) {
+            try {
+                CLIENT_HELPER.clearAll(JT400_CLEAR_ALL.isPresent() && 
Boolean.parseBoolean(JT400_CLEAR_ALL.get()));
+            } catch (Exception e) {
+                LOGGER.debug("Clearing of the external queues failed", e);
+            }
+            as400.close();
+        }
+    }
+
+    private static String getObjectPath(String object) {
+        return String.format("/QSYS.LIB/%s.LIB/%s", JT400_LIBRARY, object);
+    }
+
+    public static Jt400ClientHelper CLIENT_HELPER = new Jt400ClientHelper() {
+
+        private String key = null;
+        Map<RESOURCE_TYPE, Set<Object>> toRemove = new HashMap<>();
+
+        @Override
+        public QueuedMessage peekReplyToQueueMessage(String msg) throws 
Exception {
+            return getQueueMessage(JT400_REPLY_TO_MESSAGE_QUEUE, msg);
+        }
+
+        private QueuedMessage getQueueMessage(String queue, String msg) throws 
Exception {
+            MessageQueue messageQueue = new MessageQueue(as400,
+                    getObjectPath(queue));
+            Enumeration<QueuedMessage> msgs = messageQueue.getMessages();
+
+            while (msgs.hasMoreElements()) {
+                QueuedMessage queuedMessage = msgs.nextElement();
+
+                if (msg.equals(queuedMessage.getText())) {
+                    return queuedMessage;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public void registerForRemoval(RESOURCE_TYPE type, Object value) {
+            if (toRemove.containsKey(type)) {
+                toRemove.get(type).add(value);
+            } else {
+                Set<Object> set = new HashSet<>();
+                set.add(value);
+                toRemove.put(type, set);
+            }
+        }
+
+        @Override
+        public void clearAll(boolean all) throws Exception {
+            //message queue
+            MessageQueue mq = new MessageQueue(as400, 
getObjectPath(JT400_MESSAGE_QUEUE));
+            if (all) {
+                mq.remove();
+            } else if (toRemove.containsKey(RESOURCE_TYPE.messageQueue)) {
+                clearMessageQueue(RESOURCE_TYPE.messageQueue, mq);
+            }
+
+            //lifo queue
+            DataQueue dq = new DataQueue(as400, 
getObjectPath(JT400_LIFO_QUEUE));
+            if (all) {
+                for (int i = 01; i < CLEAR_DEPTH; i++) {
+                    if (dq.read() == null) {
+                        break;
+                    }
+                }
+            } else if (toRemove.containsKey(RESOURCE_TYPE.lifoQueueu)) {
+                for (Object entry : toRemove.get(RESOURCE_TYPE.lifoQueueu)) {
+                    List<byte[]> otherMessages = new LinkedList<>();
+                    DataQueueEntry dqe = dq.read();
+                    while (dqe != null && !(entry.equals(dqe.getString())
+                            || entry.equals(new String(dqe.getData(), 
StandardCharsets.UTF_8)))) {
+                        otherMessages.add(dqe.getData());
+                        dqe = dq.read();
+                    }
+                    //write back other messages in reverse order (it is a lifo)
+                    Collections.reverse(otherMessages);
+                    for (byte[] msg : otherMessages) {
+                        dq.write(msg);
+                    }
+                }
+            }
+            //reply-to queue
+            MessageQueue rq = new MessageQueue(as400, 
getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
+            if (all) {
+                rq.remove();
+            } else if (toRemove.containsKey(RESOURCE_TYPE.replyToQueueu)) {
+                clearMessageQueue(RESOURCE_TYPE.replyToQueueu, rq);
+            }
+
+            //keyed queue
+            KeyedDataQueue kdq = new KeyedDataQueue(as400, 
getObjectPath(JT400_KEYED_QUEUE));
+            if (all) {
+                kdq.clear();
+            } else if (toRemove.containsKey(RESOURCE_TYPE.keyedDataQue)) {
+                for (Object entry : toRemove.get(RESOURCE_TYPE.keyedDataQue)) {
+                    kdq.clear((String) entry);
+                }
+            }
+        }
+
+        private void clearMessageQueue(RESOURCE_TYPE type, MessageQueue mq) 
throws AS400SecurityException,
+                ErrorCompletingRequestException, InterruptedException, 
IOException, ObjectDoesNotExistException {
+            if (!toRemove.get(type).isEmpty()) {
+                List<QueuedMessage> msgs = Collections.list(mq.getMessages());
+                Map<String, byte[]> keys = 
msgs.stream().collect(Collectors.toMap(q -> q.getText(), q -> q.getKey()));
+                for (Object entry : toRemove.get(type)) {
+                    if (entry instanceof String) {
+                        mq.remove(keys.get((String) entry));
+                    } else {
+                        mq.remove((byte[]) entry);
+                    }
+                }
+            }
+        }
+
+        /**
+         * Keyed dataque (FIFO) is used for locking purposes.
+         *
+         * - Each participant saves unique token into a key 
cq.jt400.global-lock
+         * - Each participant the reads the FIFO queue and if the resulted 
string is its own unique token, execution is allowed
+         * - When execution ends, the key is removed
+         *
+         * If the token is not its own
+         * -read of the token is repeated until timeout or its own token is 
returned (so the second participant waits, until the
+         * first participant removes its token)
+         *
+         * Dead lock prevention
+         *
+         * - part of the unique token is timestamp, if participant finds a 
token, which is too old, token is removed
+         * - action to clear-all data removes also the locking tokens
+         *
+         *
+         * Therefore only 1 token (thus 1 participant) is allowed to run the 
tests, the others have to wait
+         *
+         * @throws Exception
+         */
+        @Override
+        public void lock() throws Exception {
+            if (key == null) {
+                key = generateKey();
+                //write key into keyed queue
+                KeyedDataQueue kdq = new KeyedDataQueue(as400, 
getObjectPath(JT400_KEYED_QUEUE));
+
+                Assertions.assertTrue(kdq.isFIFO(), "keyed dataqueue has to be 
FIFO");
+
+                kdq.write(LOCK_KEY, key);
+
+                //added 5 seconds for the timeout, to have some spare time for 
removing old locks
+                Awaitility.await().pollInterval(1, 
TimeUnit.SECONDS).atMost(LOCK_TIMEOUT + 5000, TimeUnit.SECONDS)
+                        .until(
+                                () -> {
+                                    KeyedDataQueueEntry kdqe = 
kdq.peek(LOCK_KEY);
+                                    if (kdqe == null) {
+                                        //if kdqe is null, try to lock again
+                                        LOGGER.debug("locked in the queueu was 
removed, locking again with " + key);
+                                        kdq.write(LOCK_KEY, key);
+                                    }
+                                    String peekedKey = kdqe == null ? null : 
kdqe.getString();
+                                    //if waiting takes more than 300s, check 
whether the actual lock can be removed
+                                    LOGGER.debug("peeked lock " + peekedKey + 
"(my lock is " + key + ")");
+
+                                    if (peekedKey != null && 
!key.equals(peekedKey)) {
+                                        long peekedTime = 
Long.parseLong(peekedKey.substring(11));
+                                        if (System.currentTimeMillis() - 
peekedTime > LOCK_TIMEOUT) {
+                                            //read the key (therefore remove 
it)
+                                            String readKey = 
kdq.read(LOCK_KEY).getString();
+                                            System.out.println("Removed old 
lock " + readKey);
+                                            peekedKey = 
kdq.peek(LOCK_KEY).getString();
+                                        }
+                                    }
+                                    return peekedKey;
+                                },
+                                Matchers.is(key));
+            }
+        }
+
+        @Override
+        public void unlock() throws Exception {
+            Assertions.assertEquals(key,
+                    new KeyedDataQueue(as400, 
getObjectPath(JT400_KEYED_QUEUE)).read(LOCK_KEY).getString());
+            //clear key
+            key = null;
+        }
+
+        private String generateKey() {
+            return 
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT) + ":" + 
System.currentTimeMillis();
+        }
+
+        @Override
+        public String dumpQueues() throws Exception {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("\n* MESSAGE QUEUE\n");
+            sb.append("\t" + Collections.list(new MessageQueue(as400, 
getObjectPath(JT400_MESSAGE_QUEUE)).getMessages())
+                    .stream().map(mq -> 
mq.getText()).sorted().collect(Collectors.joining(", ")));
+
+            sb.append("\n* INQUIRY QUEUE\n");
+            sb.append("\t" + Collections
+                    .list(new MessageQueue(as400, 
getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages())
+                    .stream().map(mq -> 
mq.getText()).sorted().collect(Collectors.joining(", ")));
+
+            sb.append("\n* LIFO QUEUE\n");
+            DataQueue dq = new DataQueue(as400, 
getObjectPath(JT400_LIFO_QUEUE));
+            DataQueueEntry dqe;
+            List<byte[]> lifoMessages = new LinkedList<>();
+            List<String> lifoTexts = new LinkedList<>();
+            do {
+                dqe = dq.read();
+                if (dqe != null) {
+                    lifoTexts.add(dqe.getString() + " (" + new 
String(dqe.getData(), StandardCharsets.UTF_8) + ")");
+                    lifoMessages.add(dqe.getData());
+                }
+            } while (dqe != null);
+
+            //write back other messages in reverse order (it is a lifo)
+            Collections.reverse(lifoMessages);
+            for (byte[] msg : lifoMessages) {
+                dq.write(msg);
+            }
+            sb.append(lifoTexts.stream().collect(Collectors.joining(", ")));
+
+            sb.append("\n* KEYED DATA QUEUE\n");
+            KeyedDataQueue kdq = new KeyedDataQueue(as400, 
getObjectPath(JT400_KEYED_QUEUE));
+            KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY);
+            sb.append("\tlock: " + (kdqe == null ? "null" : kdqe.getString()));
+            return sb.toString();
+        }
+
+        public void sendInquiry(String msg) throws Exception {
+            new MessageQueue(as400, 
getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg,
+                    getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
+        }
+    };
+
+}
+
+interface Jt400ClientHelper {
+
+    void registerForRemoval(Jt400TestResource.RESOURCE_TYPE type, Object 
value);
+
+    QueuedMessage peekReplyToQueueMessage(String msg) throws Exception;
+
+    void sendInquiry(String msg) throws Exception;
+
+    //------------------- clear listeners ------------------------------
+
+    void clearAll(boolean all) throws Exception;
+
+    //----------------------- locking
+
+    void lock() throws Exception;
+
+    void unlock() throws Exception;
+
+    String dumpQueues() throws Exception;
+
+}

Reply via email to