CAMEL-10242 - added support for connection timeout

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4b545ae7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4b545ae7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4b545ae7

Branch: refs/heads/master
Commit: 4b545ae7fd407f0df755e6ea40d9657f705b8ad0
Parents: 9a5778b
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Tue Sep 6 12:11:05 2016 -0600
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Sep 6 21:22:33 2016 +0200

----------------------------------------------------------------------
 .../src/main/docs/mllp-component.adoc           |   5 +-
 .../camel/component/mllp/MllpEndpoint.java      |  49 ++++++
 .../component/mllp/MllpTcpClientProducer.java   |   4 +-
 .../component/mllp/MllpTcpServerConsumer.java   |  31 +++-
 .../camel/component/mllp/impl/MllpUtil.java     |  13 +-
 .../MllpTcpServerConsumerConnectionTest.java    | 104 ++++++++----
 ...MllpTcpServerConsumerMessageHeadersTest.java | 158 +++++++++++++++++++
 .../src/test/resources/log4j2.properties        |   4 +
 8 files changed, 327 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/docs/mllp-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc 
b/components/camel-mllp/src/main/docs/mllp-component.adoc
index a96184c..2c776b1 100644
--- a/components/camel-mllp/src/main/docs/mllp-component.adoc
+++ b/components/camel-mllp/src/main/docs/mllp-component.adoc
@@ -55,7 +55,7 @@ The MLLP component has no options.
 
 
 // endpoint options: START
-The MLLP component supports 19 endpoint options which are listed below:
+The MLLP component supports 22 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -64,6 +64,7 @@ The MLLP component supports 19 endpoint options which are 
listed below:
 | hostname | common |  | String | *Required* Hostname or IP for connection for 
the TCP connection. The default value is null which means any local IP address
 | port | common |  | int | *Required* Port number for the TCP connection
 | autoAck | common | true | boolean | Enable/Disable the automatic generation 
of a MLLP Acknowledgement MLLP Consumers only
+| hl7Headers | common | true | boolean | Enable/Disable the automatic 
generation of message headers from the HL7 Message MLLP Consumers only
 | keepAlive | common | true | boolean | Enable/disable the SO_KEEPALIVE socket 
option.
 | reuseAddress | common | false | boolean | Enable/disable the SO_REUSEADDR 
socket option.
 | tcpNoDelay | common | true | boolean | Enable/disable the TCP_NODELAY socket 
option.
@@ -79,6 +80,8 @@ The MLLP component supports 19 endpoint options which are 
listed below:
 | bindRetryInterval | timeout | 5000 | int | TCP Server Only - The number of 
milliseconds to wait between bind attempts
 | bindTimeout | timeout | 30000 | int | TCP Server Only - The number of 
milliseconds to retry binding to a server port
 | connectTimeout | timeout | 30000 | int | Timeout value for establishing for 
a TCP connection TCP Client only
+| maxReceiveTimeouts | timeout | -1 | int | The maximum number of timeouts 
(specified by receiveTimeout) allowed before the TCP Connection will be reset.
+| readTimeout | timeout | 500 | int | The SO_TIMEOUT value used after the 
start of an MLLP frame has been received
 | receiveTimeout | timeout | 10000 | int | The SO_TIMEOUT value used when 
waiting for the start of an MLLP frame
 |=======================================================================
 {% endraw %}

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 6a85b9c..7a3c195 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -72,6 +72,12 @@ public class MllpEndpoint extends DefaultEndpoint {
     @UriParam(label = "timeout", defaultValue = "10000")
     int receiveTimeout = 10000;
 
+    @UriParam(label = "timeout", defaultValue = "-1")
+    int maxReceiveTimeouts = -1;
+
+    @UriParam(label = "timeout", defaultValue = "500")
+    int readTimeout = 500;
+
     @UriParam(defaultValue = "true")
     boolean keepAlive = true;
 
@@ -90,6 +96,9 @@ public class MllpEndpoint extends DefaultEndpoint {
     @UriParam(defaultValue = "true")
     boolean autoAck = true;
 
+    @UriParam(defaultValue = "true")
+    boolean hl7Headers = true;
+
     @UriParam(label = "codec")
     String charsetName;
 
@@ -261,6 +270,32 @@ public class MllpEndpoint extends DefaultEndpoint {
         this.receiveTimeout = receiveTimeout;
     }
 
+    public int getMaxReceiveTimeouts() {
+        return maxReceiveTimeouts;
+    }
+
+    /**
+     * The maximum number of timeouts (specified by receiveTimeout) allowed 
before the TCP Connection will be reset.
+     *
+     * @param maxReceiveTimeouts maximum number of receiveTimeouts
+     */
+    public void setMaxReceiveTimeouts(int maxReceiveTimeouts) {
+        this.maxReceiveTimeouts = maxReceiveTimeouts;
+    }
+
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    /**
+     * The SO_TIMEOUT value used after the start of an MLLP frame has been 
received
+     *
+     * @param readTimeout timeout in milliseconds
+     */
+    public void setReadTimeout(int readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
     public boolean isKeepAlive() {
         return keepAlive;
     }
@@ -341,4 +376,18 @@ public class MllpEndpoint extends DefaultEndpoint {
         this.autoAck = autoAck;
     }
 
+    public boolean isHl7Headers() {
+        return hl7Headers;
+    }
+
+    /**
+     * Enable/Disable the automatic generation of message headers from the HL7 
Message
+     *
+     * MLLP Consumers only
+     *
+     * @param hl7Headers enabled if true, otherwise disabled
+     */
+    public void setHl7Headers(boolean hl7Headers) {
+        this.hl7Headers = hl7Headers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
index 2e74f6c..da04286 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java
@@ -107,8 +107,8 @@ public class MllpTcpClientProducer extends DefaultProducer {
         log.debug("Reading acknowledgement from external system");
         byte[] acknowledgementBytes = null;
         try {
-            if (MllpUtil.openFrame(socket)) {
-                acknowledgementBytes = MllpUtil.closeFrame(socket);
+            if (MllpUtil.openFrame(socket, endpoint.receiveTimeout, 
endpoint.readTimeout)) {
+                acknowledgementBytes = MllpUtil.closeFrame(socket, 
endpoint.receiveTimeout, endpoint.readTimeout);
             }
         } catch (SocketTimeoutException timeoutEx) {
             exchange.setException(new 
MllpAcknowledgementTimoutException("Acknowledgement timout", timeoutEx));

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index f1fa9b7..61f29db 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -325,6 +325,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         }
     }
 
+    /**
+     * Nested Class read the Socket
+     */
     class ClientSocketThread extends Thread {
         Socket clientSocket;
         Hl7AcknowledgementGenerator acknowledgementGenerator = new 
Hl7AcknowledgementGenerator();
@@ -346,7 +349,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
             this.clientSocket.setReuseAddress(endpoint.reuseAddress);
             this.clientSocket.setSoLinger(false, -1);
 
-            // Read Timeout
+            // Initial Read Timeout
             this.clientSocket.setSoTimeout(endpoint.receiveTimeout);
 
         }
@@ -378,6 +381,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
+            int receiveTimeoutCounter = 0;
 
             while (!isInterrupted()  &&  null != clientSocket  &&  
clientSocket.isConnected()  &&  !clientSocket.isClosed()) {
                 byte[] hl7MessageBytes = null;
@@ -385,17 +389,25 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
                 log.debug("Reading data ....");
                 try {
                     if (null != initialByte && START_OF_BLOCK == initialByte) {
-                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket);
+                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout);
                     } else {
                         try {
-                            if (!MllpUtil.openFrame(clientSocket)) {
+                            if (!MllpUtil.openFrame(clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout)) {
+                                receiveTimeoutCounter = 0;
                                 continue;
+                            } else {
+                                receiveTimeoutCounter = 0;
                             }
                         } catch (SocketTimeoutException timeoutEx) {
                             // When thrown by openFrame, it indicates that no 
data was available - but no error
+                            if (endpoint.maxReceiveTimeouts > 0 && 
++receiveTimeoutCounter >= endpoint.maxReceiveTimeouts) {
+                                // TODO:  Enhance logging??
+                                log.warn("Idle Client - resetting connection");
+                                MllpUtil.resetConnection(clientSocket);
+                            }
                             continue;
                         }
-                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket);
+                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout);
                     }
                 } catch (MllpException mllpEx) {
                     Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOut);
@@ -525,6 +537,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                     MllpUtil.writeFramedPayload(clientSocket, 
acknowledgementMessageBytes);
                     exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT, 
acknowledgementMessageBytes);
                     exchange.getIn().setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, 
acknowledgementMessageType);
+                    exchange.setProperty(MLLP_ACKNOWLEDGEMENT, 
acknowledgementMessageBytes);
+                    exchange.setProperty(MLLP_ACKNOWLEDGEMENT_TYPE, 
acknowledgementMessageType);
 
                     // Check AFTER_SEND Properties
                     if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, 
boolean.class)) {
@@ -540,8 +554,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
 
             }
 
-            log.info("ClientSocketThread exiting");
-
+            log.debug("ClientSocketThread exiting");
         }
 
         private void populateHl7DataHeaders(Exchange exchange, Message 
message, byte[] hl7MessageBytes) {
@@ -567,8 +580,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
             if (-1 == endOfMSH) {
                 // TODO:  May want to throw some sort of an Exception here
                 log.error("Population of message headers failed - unable to 
find the end of the MSH segment");
-            } else {
-                log.debug("Populating the message headers");
+            } else if (endpoint.hl7Headers) {
+                log.debug("Populating the HL7 message headers");
                 Charset charset = 
Charset.forName(IOHelper.getCharsetName(exchange));
 
                 for (int i = 2; i < fieldSeparatorIndexes.size(); ++i) {
@@ -634,6 +647,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                         }
                     }
                 }
+            } else {
+                log.trace("HL7 Message headers disabled");
             }
 
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
index 2aab0a8..276d3ae 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpUtil.java
@@ -68,12 +68,13 @@ public final class MllpUtil {
      * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in 
some way
      * @throws MllpException             for other unexpected error conditions
      */
-    public static boolean openFrame(Socket socket) throws 
SocketTimeoutException, MllpCorruptFrameException, MllpException {
+    public static boolean openFrame(Socket socket, int receiveTimeout, int 
readTimeout) throws SocketTimeoutException, MllpCorruptFrameException, 
MllpException {
         if (socket.isConnected() && !socket.isClosed()) {
             InputStream socketInputStream = MllpUtil.getInputStream(socket);
 
             int readByte = -1;
             try {
+                socket.setSoTimeout(receiveTimeout);
                 readByte = socketInputStream.read();
                 switch (readByte) {
                 case START_OF_BLOCK:
@@ -110,6 +111,7 @@ public final class MllpUtil {
             outOfFrameData.write(readByte);
 
             try {
+                socket.setSoTimeout(readTimeout);
                 while (true) {
                     readByte = socketInputStream.read();
                     switch (readByte) {
@@ -183,12 +185,13 @@ public final class MllpUtil {
      * @throws MllpCorruptFrameException if the MLLP Frame is corrupted in 
some way
      * @throws MllpException             for other unexpected error conditions
      */
-    public static byte[] closeFrame(Socket socket) throws 
MllpTimeoutException, MllpCorruptFrameException, MllpException {
+    public static byte[] closeFrame(Socket socket, int receiveTimeout, int 
readTimeout) throws MllpTimeoutException, MllpCorruptFrameException, 
MllpException {
         if (socket.isConnected() && !socket.isClosed()) {
             InputStream socketInputStream = MllpUtil.getInputStream(socket);
             // TODO:  Come up with an intelligent way to size this stream
             ByteArrayOutputStream payload = new ByteArrayOutputStream(4096);
             try {
+                socket.setSoTimeout(readTimeout);
                 while (true) {
                     int readByte = socketInputStream.read();
                     switch (readByte) {
@@ -226,6 +229,7 @@ public final class MllpUtil {
                             throw new MllpCorruptFrameException("The MLLP 
frame was partially closed - END_OF_BLOCK was not followed by END_OF_DATA",
                                     payload.size() > 0 ? payload.toByteArray() 
: null);
                         }
+                        socket.setSoTimeout(receiveTimeout);
                         return payload.toByteArray();
                     default:
                         // log.trace( "Read Character: {}", (char)readByte );
@@ -263,6 +267,11 @@ public final class MllpUtil {
             }
         }
 
+        try {
+            socket.setSoTimeout(receiveTimeout);
+        } catch (SocketException e) {
+            // Eat this exception
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
index 99667b2..58b7d01 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.component.mllp;
 
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
+import java.net.SocketException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.EndpointInject;
@@ -26,25 +24,42 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
+import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
 import org.junit.Test;
 
 public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
-    String mllpHost = "localhost";
-    int mllpPort = AvailablePortFinder.getNextAvailable();
+    static final int RECEIVE_TIMEOUT = 500;
+
+    @Rule
+    public MllpClientResource mllpClient = new MllpClientResource();
+
 
     @EndpointInject(uri = "mock://result")
     MockEndpoint result;
 
     @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        mllpPort = AvailablePortFinder.getNextAvailable();
+    protected void doPreSetup() throws Exception {
+        mllpClient.setMllpHost("localhost");
+        mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable());
+
+        super.doPreSetup();
+    }
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             String routeId = "mllp-receiver";
 
             public void configure() {
-                fromF("mllp://%s:%d?autoAck=false", mllpHost, mllpPort)
+                fromF("mllp://%s:%d?autoAck=false", mllpClient.getMllpHost(), 
mllpClient.getMllpPort())
                         .log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
                         .to(result);
             }
@@ -69,33 +84,66 @@ public class MllpTcpServerConsumerConnectionTest extends 
CamelTestSupport {
      */
     @Test
     public void testConnectWithoutData() throws Exception {
-        result.setExpectedCount(0);
         int connectionCount = 10;
+        long connectionMillis = 200;
+
+        result.setExpectedCount(0);
+
+        addTestRoute(-1);
+
+        for (int i = 1; i <= connectionCount; ++i) {
+            mllpClient.connect();
+            Thread.sleep(connectionMillis);
+            mllpClient.close();
+        }
+
+        assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Simulate an Idle Client
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testIdleConnection() throws Exception {
+        final int maxReceiveTimeouts = 3;
+        String testMessage = 
"MSH|^~\\&|ADT|EPIC|JCAPS|CC|20160902123950|RISTECH|ADT^A08|00001|D|2.3|||||||" 
+ '\r' + '\n';
+
+        result.setExpectedCount(1);
+        result.setAssertPeriod(1000);
+
+        addTestRoute(maxReceiveTimeouts);
+
+        mllpClient.connect();
+        mllpClient.sendMessageAndWaitForAcknowledgement(testMessage);
+        Thread.sleep(RECEIVE_TIMEOUT * (maxReceiveTimeouts + 1));
 
-        Socket dummyLoadBalancerSocket = null;
-        SocketAddress address = new InetSocketAddress(mllpHost, mllpPort);
-        int connectTimeout = 5000;
         try {
-            for (int i = 1; i <= connectionCount; ++i) {
-                log.debug("Creating connection #{}", i);
-                dummyLoadBalancerSocket = new Socket();
-                dummyLoadBalancerSocket.connect(address, connectTimeout);
-                log.debug("Closing connection #{}", i);
-                dummyLoadBalancerSocket.close();
-                Thread.sleep(1000);
-            }
-        } finally {
-            if (null != dummyLoadBalancerSocket) {
-                try {
-                    dummyLoadBalancerSocket.close();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered closing dummy load 
balancer socket", ex);
-                }
-            }
+            mllpClient.sendMessageAndWaitForAcknowledgement(testMessage);
+            fail("The MllpClientResource should have thrown an exception when 
writing to the reset socket");
+        } catch (MllpJUnitResourceException ex) {
+            Throwable cause = ex.getCause();
+            assertIsInstanceOf(SocketException.class, cause);
+            assertEquals("Broken pipe", cause.getMessage());
         }
 
         assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
     }
 
+    void addTestRoute(int maxReceiveTimeouts) throws Exception {
+        RouteBuilder builder = new RouteBuilder() {
+            String routeId = "mllp-receiver";
+
+            public void configure() {
+                fromF("mllp://%s:%d?receiveTimeout=%d&maxReceiveTimeouts=%d", 
mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT, 
maxReceiveTimeouts)
+                        .log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
+                        .to(result);
+            }
+        };
+
+        context.addRoutes(builder);
+        context.start();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java
new file mode 100644
index 0000000..c0e54e4
--- /dev/null
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerMessageHeadersTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class MllpTcpServerConsumerMessageHeadersTest extends CamelTestSupport {
+    @Rule
+    public MllpClientResource mllpClient = new MllpClientResource();
+
+    @EndpointInject(uri = "mock://result")
+    MockEndpoint result;
+
+    @EndpointInject(uri = "mock://on-completion-result")
+    MockEndpoint onCompletionResult;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) 
super.createCamelContext();
+
+        context.setUseMDCLogging(true);
+        context.setName(this.getClass().getSimpleName());
+
+        return context;
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        mllpClient.setMllpHost("localhost");
+        mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable());
+
+        super.doPreSetup();
+    }
+
+    @Test
+    public void testHl7HeadersEnabled() throws Exception {
+        String testMessage = 
"MSH|^~\\&|ADT|EPIC|JCAPS|CC|20160902123950|RISTECH|ADT^A08|00001|D|2.3|||||||" 
+ '\r' + '\n';
+
+        addTestRoute(true);
+
+        result.expectedMessageCount(1);
+
+        result.expectedHeaderReceived(MllpConstants.MLLP_SENDING_APPLICATION, 
"ADT");
+        result.expectedHeaderReceived(MllpConstants.MLLP_SENDING_FACILITY, 
"EPIC");
+        
result.expectedHeaderReceived(MllpConstants.MLLP_RECEIVING_APPLICATION, 
"JCAPS");
+        result.expectedHeaderReceived(MllpConstants.MLLP_TIMESTAMP, 
"20160902123950");
+        result.expectedHeaderReceived(MllpConstants.MLLP_SECURITY, "RISTECH");
+        result.expectedHeaderReceived(MllpConstants.MLLP_MESSAGE_TYPE, 
"ADT^A08");
+        result.expectedHeaderReceived(MllpConstants.MLLP_EVENT_TYPE, "ADT");
+        result.expectedHeaderReceived(MllpConstants.MLLP_TRIGGER_EVENT, "A08");
+        result.expectedHeaderReceived(MllpConstants.MLLP_MESSAGE_CONTROL, 
"00001");
+        result.expectedHeaderReceived(MllpConstants.MLLP_PROCESSING_ID, "D");
+        result.expectedHeaderReceived(MllpConstants.MLLP_VERSION_ID, "2.3");
+
+        mllpClient.connect();
+
+        mllpClient.sendMessageAndWaitForAcknowledgement(testMessage, 10000);
+
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+
+        Message message = result.getExchanges().get(0).getIn();
+
+        assertNotNull("Should have header" + MllpConstants.MLLP_LOCAL_ADDRESS, 
message.getHeader(MllpConstants.MLLP_LOCAL_ADDRESS));
+        assertNotNull("Should have header" + 
MllpConstants.MLLP_REMOTE_ADDRESS, 
message.getHeader(MllpConstants.MLLP_REMOTE_ADDRESS));
+    }
+
+
+    @Test
+    public void testHl7HeadersDisabled() throws Exception {
+        String testMessage = 
"MSH|^~\\&|ADT|EPIC|JCAPS|CC|20160902123950|RISTECH|ADT^A08|00001|D|2.3|||||||" 
+ '\r' + '\n';
+
+        addTestRoute(false);
+
+        result.expectedMessageCount(1);
+
+        mllpClient.connect();
+
+        mllpClient.sendMessageAndWaitForAcknowledgement(testMessage, 10000);
+
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+
+        Message message = result.getExchanges().get(0).getIn();
+
+        assertNotNull("Should have header" + MllpConstants.MLLP_LOCAL_ADDRESS, 
message.getHeader(MllpConstants.MLLP_LOCAL_ADDRESS));
+        assertNotNull("Should have header" + 
MllpConstants.MLLP_REMOTE_ADDRESS, 
message.getHeader(MllpConstants.MLLP_REMOTE_ADDRESS));
+
+        assertNull("Should NOT have header" + 
MllpConstants.MLLP_SENDING_APPLICATION, 
message.getHeader(MllpConstants.MLLP_SENDING_APPLICATION));
+        assertNull("Should NOT have header" + 
MllpConstants.MLLP_SENDING_FACILITY, 
message.getHeader(MllpConstants.MLLP_SENDING_FACILITY));
+        assertNull("Should NOT have header" + 
MllpConstants.MLLP_RECEIVING_APPLICATION, 
message.getHeader(MllpConstants.MLLP_RECEIVING_APPLICATION));
+        assertNull("Should NOT have header" + MllpConstants.MLLP_TIMESTAMP, 
message.getHeader(MllpConstants.MLLP_TIMESTAMP));
+        assertNull("Should NOT have header" + MllpConstants.MLLP_SECURITY, 
message.getHeader(MllpConstants.MLLP_SECURITY));
+        assertNull("Should NOT have header" + MllpConstants.MLLP_MESSAGE_TYPE, 
message.getHeader(MllpConstants.MLLP_MESSAGE_TYPE));
+        assertNull("Should NOT have header" + MllpConstants.MLLP_EVENT_TYPE, 
message.getHeader(MllpConstants.MLLP_EVENT_TYPE));
+        assertNull("Should NOT have header" + 
MllpConstants.MLLP_MESSAGE_CONTROL, 
message.getHeader(MllpConstants.MLLP_MESSAGE_CONTROL));
+        assertNull("Should NOT have header" + 
MllpConstants.MLLP_PROCESSING_ID, 
message.getHeader(MllpConstants.MLLP_PROCESSING_ID));
+        assertNull("Should NOT have header" + MllpConstants.MLLP_VERSION_ID, 
message.getHeader(MllpConstants.MLLP_VERSION_ID));
+    }
+
+    void addTestRoute(boolean hl7Headers) throws Exception {
+        RouteBuilder builder = new RouteBuilder() {
+            int connectTimeout = 500;
+            int responseTimeout = 5000;
+
+            @Override
+            public void configure() throws Exception {
+                String routeId = "mllp-test-receiver-route";
+
+                onCompletion()
+                        .to("mock://on-completion-result")
+                        .toF("log:%s?level=INFO&showAll=true", routeId)
+                        .log(LoggingLevel.INFO, routeId, "Test route 
complete");
+
+                
fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&hl7Headers=%b",
+                        mllpClient.getMllpHost(), mllpClient.getMllpPort(), 
connectTimeout, responseTimeout, hl7Headers)
+                        .routeId(routeId)
+                        .log(LoggingLevel.INFO, routeId, "Test route received 
message")
+                        .to(result);
+
+            }
+        };
+        context.addRoutes(builder);
+        context.start();
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/4b545ae7/components/camel-mllp/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/resources/log4j2.properties 
b/components/camel-mllp/src/test/resources/log4j2.properties
index e1d76ef..8c321d2 100644
--- a/components/camel-mllp/src/test/resources/log4j2.properties
+++ b/components/camel-mllp/src/test/resources/log4j2.properties
@@ -26,3 +26,7 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file
+
+# loggers = mllp
+# logger.mllp.name = org.apache.camel.component.mllp
+# logger.mllp.level = DEBUG

Reply via email to