Repository: camel
Updated Branches:
  refs/heads/camel-2.18.x 0ed067b1e -> ba8f3aa04
  refs/heads/master 911218438 -> 58e36d63b


http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
index fe1ad40..880761c 100644
--- 
a/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/test/junit/rule/mllp/MllpServerResource.java
@@ -19,6 +19,7 @@ package org.apache.camel.test.junit.rule.mllp;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -45,8 +46,6 @@ import static 
org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
  *
  * The server can be configured to simulate a large number
  * of error conditions.
- *
- * TODO:  This needs to be looked at - it may be orphaning threads
  */
 public class MllpServerResource extends ExternalResource {
     Logger log = LoggerFactory.getLogger(this.getClass());
@@ -59,6 +58,12 @@ public class MllpServerResource extends ExternalResource {
 
     boolean active = true;
 
+    int delayBeforeStartOfBlock;
+    int delayBeforeAcknowledgement;
+    int delayDuringAcknowledgement;
+    int delayAfterAcknowledgement;
+    int delayAfterEndOfBlock;
+
     int excludeStartOfBlockModulus;
     int excludeEndOfBlockModulus;
     int excludeEndOfDataModulus;
@@ -67,8 +72,11 @@ public class MllpServerResource extends ExternalResource {
 
     int sendOutOfBandDataModulus;
 
-    int disconnectBeforeAcknowledgementModulus;
-    int disconnectAfterAcknowledgementModulus;
+    int closeSocketBeforeAcknowledgementModulus;
+    int closeSocketAfterAcknowledgementModulus;
+
+    int resetSocketBeforeAcknowledgementModulus;
+    int resetSocketAfterAcknowledgementModulus;
 
     int sendApplicationRejectAcknowledgementModulus;
     int sendApplicationErrorAcknowledgementModulus;
@@ -76,6 +84,8 @@ public class MllpServerResource extends ExternalResource {
     Pattern sendApplicationRejectAcknowledgementPattern;
     Pattern sendApplicationErrorAcknowledgementPattern;
 
+    String acknowledgementString;
+
     ServerSocketThread serverSocketThread;
 
     public MllpServerResource() {
@@ -166,6 +176,46 @@ public class MllpServerResource extends ExternalResource {
         serverSocketThread.interrupt();
     }
 
+    public int getDelayBeforeStartOfBlock() {
+        return delayBeforeStartOfBlock;
+    }
+
+    public void setDelayBeforeStartOfBlock(int delayBeforeStartOfBlock) {
+        this.delayBeforeStartOfBlock = delayBeforeStartOfBlock;
+    }
+
+    public int getDelayBeforeAcknowledgement() {
+        return delayBeforeAcknowledgement;
+    }
+
+    public void setDelayBeforeAcknowledgement(int delayBeforeAcknowledgement) {
+        this.delayBeforeAcknowledgement = delayBeforeAcknowledgement;
+    }
+
+    public int getDelayDuringAcknowledgement() {
+        return delayDuringAcknowledgement;
+    }
+
+    public void setDelayDuringAcknowledgement(int delayDuringAcknowledgement) {
+        this.delayDuringAcknowledgement = delayDuringAcknowledgement;
+    }
+
+    public int getDelayAfterAcknowledgement() {
+        return delayAfterAcknowledgement;
+    }
+
+    public void setDelayAfterAcknowledgement(int delayAfterAcknowledgement) {
+        this.delayAfterAcknowledgement = delayAfterAcknowledgement;
+    }
+
+    public int getDelayAfterEndOfBlock() {
+        return delayAfterEndOfBlock;
+    }
+
+    public void setDelayAfterEndOfBlock(int delayAfterEndOfBlock) {
+        this.delayAfterEndOfBlock = delayAfterEndOfBlock;
+    }
+
     public boolean sendApplicationRejectAcknowledgement(String hl7Message) {
         return evaluatePatten(hl7Message, 
this.sendApplicationErrorAcknowledgementPattern);
     }
@@ -198,12 +248,20 @@ public class MllpServerResource extends ExternalResource {
         return evaluateModulus(messageCount, excludeEndOfDataModulus);
     }
 
-    public boolean disconnectBeforeAcknowledgement(int messageCount) {
-        return evaluateModulus(messageCount, 
disconnectBeforeAcknowledgementModulus);
+    public boolean closeSocketBeforeAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, 
closeSocketBeforeAcknowledgementModulus);
     }
 
-    public boolean disconnectAfterAcknowledgement(int messageCount) {
-        return evaluateModulus(messageCount, 
disconnectAfterAcknowledgementModulus);
+    public boolean closeSocketAfterAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, 
closeSocketAfterAcknowledgementModulus);
+    }
+
+    public boolean resetSocketBeforeAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, 
resetSocketBeforeAcknowledgementModulus);
+    }
+
+    public boolean resetSocketAfterAcknowledgement(int messageCount) {
+        return evaluateModulus(messageCount, 
resetSocketAfterAcknowledgementModulus);
     }
 
     public boolean sendOutOfBandData(int messageCount) {
@@ -369,27 +427,51 @@ public class MllpServerResource extends ExternalResource {
         }
     }
 
-    public int getDisconnectBeforeAcknowledgementModulus() {
-        return disconnectBeforeAcknowledgementModulus;
+    public int getCloseSocketBeforeAcknowledgementModulus() {
+        return closeSocketBeforeAcknowledgementModulus;
+    }
+
+    public void setCloseSocketBeforeAcknowledgementModulus(int 
closeSocketBeforeAcknowledgementModulus) {
+        if (0 > closeSocketBeforeAcknowledgementModulus) {
+            this.closeSocketBeforeAcknowledgementModulus = 0;
+        } else {
+            this.closeSocketBeforeAcknowledgementModulus = 
closeSocketBeforeAcknowledgementModulus;
+        }
+    }
+
+    public int getCloseSocketAfterAcknowledgementModulus() {
+        return closeSocketAfterAcknowledgementModulus;
+    }
+
+    public void setCloseSocketAfterAcknowledgementModulus(int 
closeSocketAfterAcknowledgementModulus) {
+        if (0 > closeSocketAfterAcknowledgementModulus) {
+            this.closeSocketAfterAcknowledgementModulus = 0;
+        } else {
+            this.closeSocketAfterAcknowledgementModulus = 
closeSocketAfterAcknowledgementModulus;
+        }
+    }
+
+    public int getResetSocketBeforeAcknowledgementModulus() {
+        return resetSocketBeforeAcknowledgementModulus;
     }
 
-    public void setDisconnectBeforeAcknowledgementModulus(int 
disconnectBeforeAcknowledgementModulus) {
-        if (0 > disconnectBeforeAcknowledgementModulus) {
-            this.disconnectBeforeAcknowledgementModulus = 0;
+    public void setResetSocketBeforeAcknowledgementModulus(int 
resetSocketBeforeAcknowledgementModulus) {
+        if (0 > resetSocketBeforeAcknowledgementModulus) {
+            this.resetSocketBeforeAcknowledgementModulus = 0;
         } else {
-            this.disconnectBeforeAcknowledgementModulus = 
disconnectBeforeAcknowledgementModulus;
+            this.resetSocketBeforeAcknowledgementModulus = 
resetSocketBeforeAcknowledgementModulus;
         }
     }
 
-    public int getDisconnectAfterAcknowledgementModulus() {
-        return disconnectAfterAcknowledgementModulus;
+    public int getResetSocketAfterAcknowledgementModulus() {
+        return resetSocketAfterAcknowledgementModulus;
     }
 
-    public void setDisconnectAfterAcknowledgementModulus(int 
disconnectAfterAcknowledgementModulus) {
-        if (0 > disconnectAfterAcknowledgementModulus) {
-            this.disconnectAfterAcknowledgementModulus = 0;
+    public void setResetSocketAfterAcknowledgementModulus(int 
resetSocketAfterAcknowledgementModulus) {
+        if (0 > resetSocketAfterAcknowledgementModulus) {
+            this.resetSocketAfterAcknowledgementModulus = 0;
         } else {
-            this.disconnectAfterAcknowledgementModulus = 
disconnectAfterAcknowledgementModulus;
+            this.resetSocketAfterAcknowledgementModulus = 
resetSocketAfterAcknowledgementModulus;
         }
     }
 
@@ -433,6 +515,14 @@ public class MllpServerResource extends ExternalResource {
         this.sendApplicationErrorAcknowledgementPattern = 
sendApplicationErrorAcknowledgementPattern;
     }
 
+    public String getAcknowledgementString() {
+        return acknowledgementString;
+    }
+
+    public void setAcknowledgementString(String acknowledgementString) {
+        this.acknowledgementString = acknowledgementString;
+    }
+
     public ServerSocketThread getServerSocketThread() {
         return serverSocketThread;
     }
@@ -441,41 +531,95 @@ public class MllpServerResource extends ExternalResource {
         this.serverSocketThread = serverSocketThread;
     }
 
-    void closeConnection(Socket socket) {
-        if (null != socket) {
-            if (!socket.isClosed()) {
-                try {
-                    socket.shutdownInput();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered shutting down the input 
stream on the client socket", ex);
-                }
-
-                try {
-                    socket.shutdownOutput();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered shutting down the output 
stream on the client socket", ex);
-                }
+    public void closeClientConnections() {
+        if (serverSocketThread != null) {
+            serverSocketThread.closeClientConnections();
+        }
+    }
 
-                try {
-                    socket.close();
-                } catch (Exception ex) {
-                    log.warn("Exception encountered closing the client 
socket", ex);
-                }
-            }
+    public void resetClientConnections() {
+        if (serverSocketThread != null) {
+            serverSocketThread.resetClientConnections();
         }
     }
 
-    void resetConnection(Socket socket) {
-        if (null != socket && !socket.isClosed()) {
-            try {
-                socket.setSoLinger(true, 0);
-            } catch (SocketException socketEx) {
-                log.debug("SocketException encountered setting SO_LINGER to 0 
on the socket to force a reset - ignoring", socketEx);
-            } finally {
-                closeConnection(socket);
+    /**
+     * Generates a HL7 Application Acknowledgement
+     *
+     * @param hl7Message          HL7 message that is being acknowledged
+     * @param acknowledgementCode AA, AE or AR
+     * @return a HL7 Application Acknowledgement
+     */
+    protected String generateAcknowledgement(String hl7Message, String 
acknowledgementCode) {
+        final String defaulNackMessage =
+                "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER
+                        + "MSA|AR|" + SEGMENT_DELIMITER
+                        + MESSAGE_TERMINATOR;
+
+        if (hl7Message == null) {
+            log.error("Invalid HL7 message for parsing operation. Please check 
your inputs");
+            return defaulNackMessage;
+        }
+
+        if (!("AA".equals(acknowledgementCode) || 
"AE".equals(acknowledgementCode) || "AR".equals(acknowledgementCode))) {
+            throw new IllegalArgumentException("Acknowledgemnt Code must be 
AA, AE or AR: " + acknowledgementCode);
+        }
+
+        String messageControlId;
+
+        int endOfMshSegment = hl7Message.indexOf(SEGMENT_DELIMITER);
+        if (-1 != endOfMshSegment) {
+            String mshSegment = hl7Message.substring(0, endOfMshSegment);
+            char fieldSeparator = mshSegment.charAt(3);
+            String fieldSeparatorPattern = Pattern.quote("" + fieldSeparator);
+            String[] mshFields = mshSegment.split(fieldSeparatorPattern);
+            if (mshFields.length == 0) {
+                log.error("Failed to split MSH Segment into fields");
+            } else {
+                StringBuilder ackBuilder = new 
StringBuilder(mshSegment.length() + 25);
+                // Build the MSH Segment
+                ackBuilder
+                        .append(mshFields[0]).append(fieldSeparator)
+                        .append(mshFields[1]).append(fieldSeparator)
+                        .append(mshFields[4]).append(fieldSeparator)
+                        .append(mshFields[5]).append(fieldSeparator)
+                        .append(mshFields[2]).append(fieldSeparator)
+                        .append(mshFields[3]).append(fieldSeparator)
+                        .append(mshFields[6]).append(fieldSeparator)
+                        .append(mshFields[7]).append(fieldSeparator)
+                        .append("ACK")
+                        .append(mshFields[8].substring(3));
+                for (int i = 9; i < mshFields.length; ++i) {
+                    ackBuilder.append(fieldSeparator).append(mshFields[i]);
+                }
+                // Empty fields at the end are not preserved by String.split, 
so preserve them
+                int emptyFieldIndex = mshSegment.length() - 1;
+                if (fieldSeparator == mshSegment.charAt(mshSegment.length() - 
1)) {
+                    ackBuilder.append(fieldSeparator);
+                    while (emptyFieldIndex >= 1 && 
mshSegment.charAt(emptyFieldIndex) == mshSegment.charAt(emptyFieldIndex - 1)) {
+                        ackBuilder.append(fieldSeparator);
+                        --emptyFieldIndex;
+                    }
+                }
+                ackBuilder.append(SEGMENT_DELIMITER);
+
+                // Build the MSA Segment
+                ackBuilder
+                        .append("MSA").append(fieldSeparator)
+                        .append(acknowledgementCode).append(fieldSeparator)
+                        .append(mshFields[9]).append(fieldSeparator)
+                        .append(SEGMENT_DELIMITER);
+
+                // Terminate the message
+                ackBuilder.append(MESSAGE_TERMINATOR);
+
+                return ackBuilder.toString();
             }
+        } else {
+            log.error("Failed to find the end of the  MSH Segment");
         }
 
+        return null;
     }
 
     /**
@@ -566,6 +710,22 @@ public class MllpServerResource extends ExternalResource {
             log.info("Opened TCP Listener on port {}", 
serverSocket.getLocalPort());
         }
 
+        void closeClientConnections() {
+            if (clientSocketThreads != null) {
+                for (ClientSocketThread clientSocketThread : 
clientSocketThreads) {
+                    clientSocketThread.closeConnection();
+                }
+            }
+        }
+
+        void resetClientConnections() {
+            if (clientSocketThreads != null) {
+                for (ClientSocketThread clientSocketThread : 
clientSocketThreads) {
+                    clientSocketThread.resetConnection();
+                }
+            }
+        }
+
         /**
          * Accept TCP connections and create ClientSocketThreads for them
          */
@@ -586,7 +746,16 @@ public class MllpServerResource extends ExternalResource {
                     if (null == clientSocket) {
                         continue;
                     } else if (!clientSocket.isClosed()) {
-                        resetConnection(clientSocket);
+                        try {
+                            clientSocket.setSoLinger(true, 0);
+                        } catch (SocketException soLingerEx) {
+                            log.warn("Ignoring SocketException encountered 
when setting SO_LINGER in preparation of resetting client Socket", soLingerEx);
+                        }
+                        try {
+                            clientSocket.close();
+                        } catch (IOException ioEx) {
+                            log.warn("Ignoring IOException encountered when 
resetting client Socket", ioEx);
+                        }
                         continue;
                     } else {
                         throw new MllpJUnitResourceException("Unexpected 
SocketException encountered accepting client connection", socketEx);
@@ -685,6 +854,10 @@ public class MllpServerResource extends ExternalResource {
             }
             super.interrupt();
         }
+
+        public void close() {
+
+        }
     }
 
     /**
@@ -711,6 +884,35 @@ public class MllpServerResource extends ExternalResource {
             this.clientSocket = clientSocket;
         }
 
+        void closeConnection() {
+            if (clientSocket != null && !clientSocket.isClosed()) {
+                try {
+                    clientSocket.close();
+                } catch (IOException ioEx) {
+                    log.warn("Ignoring IOException encountered when closing 
client Socket", ioEx);
+                } finally {
+                    clientSocket = null;
+                }
+            }
+        }
+
+        void resetConnection() {
+            if (clientSocket != null && !clientSocket.isClosed()) {
+                try {
+                    clientSocket.setSoLinger(true, 0);
+                } catch (SocketException socketEx) {
+                    log.warn("Ignoring SocketException encountered when 
setting SO_LINGER in preparation of resetting client Socket", socketEx);
+                }
+                try {
+                    clientSocket.close();
+                } catch (IOException ioEx) {
+                    log.warn("Ignoring IOException encountered when resetting 
client Socket", ioEx);
+                } finally {
+                    clientSocket = null;
+                }
+            }
+        }
+
         /**
          * Receives HL7 messages and replies with HL7 Acknowledgements.
          *
@@ -738,12 +940,30 @@ public class MllpServerResource extends ExternalResource {
                     } catch (Exception unexpectedEx) {
                         throw new MllpJUnitResourceException("Unexpected 
exception encounted getting input stream", unexpectedEx);
                     }
-                    String parsedHL7Message = getMessage(instream);
+                    String parsedHL7Message;
+                    try {
+                        parsedHL7Message = getMessage(instream);
+                    } catch (SocketTimeoutException timeoutEx) {
+                        log.info("Waiting for message from client");
+                        continue;
+                    }
 
                     if (null != parsedHL7Message && parsedHL7Message.length() 
> 0) {
                         ++messageCounter;
-                        if (disconnectBeforeAcknowledgement(messageCounter)) {
-                            log.warn("Disconnecting before sending 
acknowledgement");
+                        if (closeSocketBeforeAcknowledgement(messageCounter)) {
+                            log.warn("Closing socket before sending 
acknowledgement");
+                            clientSocket.shutdownInput();
+                            clientSocket.shutdownOutput();
+                            clientSocket.close();
+                            break;
+                        }
+                        if (resetSocketBeforeAcknowledgement(messageCounter)) {
+                            log.warn("Resetting socket before sending 
acknowledgement");
+                            try {
+                                clientSocket.setSoLinger(true, 0);
+                            } catch (IOException ioEx) {
+                                log.warn("Ignoring IOException encountered 
setting SO_LINGER when prepareing to reset socket", ioEx);
+                            }
                             clientSocket.shutdownInput();
                             clientSocket.shutdownOutput();
                             clientSocket.close();
@@ -752,39 +972,67 @@ public class MllpServerResource extends ExternalResource {
 
                         String acknowledgmentMessage;
 
-                        if 
(sendApplicationErrorAcknowledgement(messageCounter) || 
sendApplicationErrorAcknowledgement(parsedHL7Message)) {
-                            acknowledgmentMessage = 
generateAcknowledgementMessage(parsedHL7Message, "AE");
-                        } else if 
(sendApplicationRejectAcknowledgement(messageCounter) || 
sendApplicationRejectAcknowledgement(parsedHL7Message)) {
-                            acknowledgmentMessage = 
generateAcknowledgementMessage(parsedHL7Message, "AR");
+                        if (acknowledgementString == null) {
+                            if 
(sendApplicationErrorAcknowledgement(messageCounter) || 
sendApplicationErrorAcknowledgement(parsedHL7Message)) {
+                                acknowledgmentMessage = 
generateAcknowledgementMessage(parsedHL7Message, "AE");
+                            } else if 
(sendApplicationRejectAcknowledgement(messageCounter) || 
sendApplicationRejectAcknowledgement(parsedHL7Message)) {
+                                acknowledgmentMessage = 
generateAcknowledgementMessage(parsedHL7Message, "AR");
+                            } else {
+                                acknowledgmentMessage = 
generateAcknowledgementMessage(parsedHL7Message);
+                            }
                         } else {
-                            acknowledgmentMessage = 
generateAcknowledgementMessage(parsedHL7Message);
-
+                            acknowledgmentMessage = acknowledgementString;
                         }
+
                         BufferedOutputStream outstream = new 
BufferedOutputStream(clientSocket.getOutputStream());
 
                         if (sendOutOfBandData(messageCounter)) {
                             byte[] outOfBandDataBytes = "Out Of Band 
Hl7MessageGenerator".getBytes();
                             outstream.write(outOfBandDataBytes, 0, 
outOfBandDataBytes.length);
-
                         }
+
                         if (excludeStartOfBlock(messageCounter)) {
-                            log.warn("NOT sending 
bMLLP_ENVELOPE_START_OF_BLOCK");
+                            log.warn("NOT sending START_OF_BLOCK");
                         } else {
                             outstream.write(START_OF_BLOCK);
+                            if (delayBeforeStartOfBlock > 0) {
+                                uncheckedSleep(delayBeforeStartOfBlock);
+                                uncheckedFlush(outstream);
+                            }
                         }
 
                         if (excludeAcknowledgement(messageCounter)) {
                             log.info("NOT sending Acknowledgement body");
                         } else {
+                            if (delayBeforeAcknowledgement > 0) {
+                                uncheckedSleep(delayBeforeAcknowledgement);
+                            }
                             log.debug("Buffering Acknowledgement\n\t{}", 
acknowledgmentMessage.replace('\r', '\n'));
                             byte[] ackBytes = acknowledgmentMessage.getBytes();
-                            outstream.write(ackBytes, 0, ackBytes.length);
+                            if (delayDuringAcknowledgement > 0) {
+                                int firstHalf = ackBytes.length / 2;
+                                outstream.write(ackBytes, 0, firstHalf);
+                                uncheckedFlush(outstream);
+                                uncheckedSleep(delayDuringAcknowledgement);
+                                outstream.write(ackBytes, firstHalf, 
ackBytes.length - firstHalf);
+                                uncheckedFlush(outstream);
+                            } else {
+                                outstream.write(ackBytes, 0, ackBytes.length);
+                            }
+                            if (delayAfterAcknowledgement > 0) {
+                                uncheckedFlush(outstream);
+                                uncheckedSleep(delayAfterAcknowledgement);
+                            }
                         }
 
                         if (excludeEndOfBlock(messageCounter)) {
                             log.warn("NOT sending 
bMLLP_ENVELOPE_END_OF_BLOCK");
                         } else {
                             outstream.write(END_OF_BLOCK);
+                            if (delayAfterEndOfBlock > 0) {
+                                uncheckedFlush(outstream);
+                                uncheckedSleep(delayAfterEndOfBlock);
+                            }
                         }
 
                         if (excludeEndOfData(messageCounter)) {
@@ -794,9 +1042,9 @@ public class MllpServerResource extends ExternalResource {
                         }
 
                         log.debug("Writing Acknowledgement\n\t{}", 
acknowledgmentMessage.replace('\r', '\n'));
-                        outstream.flush();
+                        uncheckedFlush(outstream);
 
-                        if (disconnectAfterAcknowledgement(messageCounter)) {
+                        if (closeSocketAfterAcknowledgement(messageCounter)) {
                             log.info("Closing Client");
                             clientSocket.shutdownInput();
                             clientSocket.shutdownOutput();
@@ -806,16 +1054,18 @@ public class MllpServerResource extends ExternalResource 
{
                     }
                 }
             } catch (IOException e) {
-                String errorMessage = "Error whiling reading and writing to 
clientSocket";
+                String errorMessage = "Error while reading and writing from 
clientSocket";
                 log.error(errorMessage, e);
                 throw new MllpJUnitResourceException(errorMessage, e);
             } finally {
-                try {
-                    clientSocket.close();
-                } catch (IOException e) {
-                    String errorMessage = "Error whiling attempting to close 
to client Socket";
-                    log.error(errorMessage, e);
-                    throw new MllpJUnitResourceException(errorMessage, e);
+                if (clientSocket != null) {
+                    try {
+                        clientSocket.close();
+                    } catch (IOException e) {
+                        String errorMessage = "Error while attempting to close 
to client Socket";
+                        log.error(errorMessage, e);
+                        throw new MllpJUnitResourceException(errorMessage, e);
+                    }
                 }
             }
 
@@ -829,10 +1079,8 @@ public class MllpServerResource extends ExternalResource {
          * @return the MLLP payload
          * @throws IOException when the underlying Java Socket calls raise 
these exceptions
          */
-        // TODO:  Enhance this to detect non-HL7 data (i.e. look for MSH after 
START_OF_BLOCK)
         public String getMessage(InputStream anInputStream) throws IOException 
{
             try {
-                // TODO:  Enhance this to read a bunch of characters and log, 
rather than log them one at a time
                 boolean waitingForStartOfBlock = true;
                 while (waitingForStartOfBlock) {
                     int potentialStartCharacter = anInputStream.read();
@@ -847,14 +1095,16 @@ public class MllpServerResource extends ExternalResource 
{
                     }
                 }
             } catch (SocketException socketEx) {
-                if (clientSocket.isClosed()) {
-                    log.info("Client socket closed while waiting for 
MLLP_ENVELOPE_START_OF_BLOCK");
-                } else if (clientSocket.isConnected()) {
-                    log.info("SocketException encountered while waiting for 
MLLP_ENVELOPE_START_OF_BLOCK");
-                    resetConnection(clientSocket);
-                } else {
-                    log.error("Unable to read from socket stream when expected 
bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx);
-                    resetConnection(clientSocket);
+                if (clientSocket != null) {
+                    if (clientSocket.isClosed()) {
+                        log.info("Client socket closed while waiting for 
MLLP_ENVELOPE_START_OF_BLOCK");
+                    } else if (clientSocket.isConnected()) {
+                        log.info("SocketException encountered while waiting 
for MLLP_ENVELOPE_START_OF_BLOCK");
+                        resetConnection();
+                    } else {
+                        log.error("Unable to read from socket stream when 
expected bMLLP_ENVELOPE_START_OF_BLOCK - resetting connection ", socketEx);
+                        resetConnection();
+                    }
                 }
                 return null;
             }
@@ -907,75 +1157,7 @@ public class MllpServerResource extends ExternalResource {
          * @return a HL7 Application Acknowledgement
          */
         private String generateAcknowledgementMessage(String hl7Message, 
String acknowledgementCode) {
-            final String defaulNackMessage =
-                    "MSH|^~\\&|||||||NACK||P|2.2" + SEGMENT_DELIMITER
-                            + "MSA|AR|" + SEGMENT_DELIMITER
-                            + MESSAGE_TERMINATOR;
-
-            if (hl7Message == null) {
-                log.error("Invalid HL7 message for parsing operation. Please 
check your inputs");
-                return defaulNackMessage;
-            }
-
-            if (!("AA".equals(acknowledgementCode) || 
"AE".equals(acknowledgementCode) || "AR".equals(acknowledgementCode))) {
-                throw new IllegalArgumentException("Acknowledgemnt Code must 
be AA, AE or AR: " + acknowledgementCode);
-            }
-
-            String messageControlId;
-
-            int endOfMshSegment = hl7Message.indexOf(SEGMENT_DELIMITER);
-            if (-1 != endOfMshSegment) {
-                String mshSegment = hl7Message.substring(0, endOfMshSegment);
-                char fieldSeparator = mshSegment.charAt(3);
-                String fieldSeparatorPattern = Pattern.quote("" + 
fieldSeparator);
-                String[] mshFields = mshSegment.split(fieldSeparatorPattern);
-                if (mshFields.length == 0) {
-                    log.error("Failed to split MSH Segment into fields");
-                } else {
-                    StringBuilder ackBuilder = new 
StringBuilder(mshSegment.length() + 25);
-                    // Build the MSH Segment
-                    ackBuilder
-                            .append(mshFields[0]).append(fieldSeparator)
-                            .append(mshFields[1]).append(fieldSeparator)
-                            .append(mshFields[4]).append(fieldSeparator)
-                            .append(mshFields[5]).append(fieldSeparator)
-                            .append(mshFields[2]).append(fieldSeparator)
-                            .append(mshFields[3]).append(fieldSeparator)
-                            .append(mshFields[6]).append(fieldSeparator)
-                            .append(mshFields[7]).append(fieldSeparator)
-                            .append("ACK")
-                            .append(mshFields[8].substring(3));
-                    for (int i = 9; i < mshFields.length; ++i) {
-                        ackBuilder.append(fieldSeparator).append(mshFields[i]);
-                    }
-                    // Empty fields at the end are not preserved by 
String.split, so preserve them
-                    int emptyFieldIndex = mshSegment.length() - 1;
-                    if (fieldSeparator == 
mshSegment.charAt(mshSegment.length() - 1)) {
-                        ackBuilder.append(fieldSeparator);
-                        while (emptyFieldIndex >= 1 && 
mshSegment.charAt(emptyFieldIndex) == mshSegment.charAt(emptyFieldIndex - 1)) {
-                            ackBuilder.append(fieldSeparator);
-                            --emptyFieldIndex;
-                        }
-                    }
-                    ackBuilder.append(SEGMENT_DELIMITER);
-
-                    // Build the MSA Segment
-                    ackBuilder
-                            .append("MSA").append(fieldSeparator)
-                            .append(acknowledgementCode).append(fieldSeparator)
-                            .append(mshFields[9]).append(fieldSeparator)
-                            .append(SEGMENT_DELIMITER);
-
-                    // Terminate the message
-                    ackBuilder.append(MESSAGE_TERMINATOR);
-
-                    return ackBuilder.toString();
-                }
-            } else {
-                log.error("Failed to find the end of the  MSH Segment");
-            }
-
-            return null;
+            return generateAcknowledgement(hl7Message, acknowledgementCode);
         }
 
         @Override
@@ -989,6 +1171,23 @@ public class MllpServerResource extends ExternalResource {
             }
             super.interrupt();
         }
+
+        private void uncheckedSleep(long milliseconds) {
+            try {
+                Thread.sleep(milliseconds);
+            } catch (InterruptedException e) {
+                log.warn("Sleep interrupted", e);
+            }
+
+        }
+
+        private void uncheckedFlush(OutputStream outputStream) {
+            try {
+                outputStream.flush();
+            } catch (IOException e) {
+                log.warn("Ignoring exception caught while flushing output 
stream", e);
+            }
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
 
b/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
new file mode 100644
index 0000000..3959b43
--- /dev/null
+++ 
b/components/camel-mllp/src/test/java/org/apache/camel/test/util/PayloadBuilder.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class PayloadBuilder {
+    ByteArrayOutputStream builderStream = new ByteArrayOutputStream();
+
+    public PayloadBuilder() {
+    }
+
+    public PayloadBuilder(byte b) throws IOException {
+        this.append(b);
+    }
+
+    public PayloadBuilder(byte[] bytes) throws IOException {
+        this.append(bytes);
+    }
+
+    public PayloadBuilder(char... chars) throws IOException {
+        this.append(chars);
+    }
+
+    public PayloadBuilder(String... strings) throws IOException {
+        this.append(strings);
+    }
+
+    public PayloadBuilder append(byte b) throws IOException {
+        builderStream.write(b);
+
+        return this;
+    }
+
+    public PayloadBuilder append(byte[] bytes) throws IOException {
+        builderStream.write(bytes);
+
+        return this;
+    }
+
+    public PayloadBuilder append(char... chars) throws IOException {
+        if (chars != null) {
+            for (char c : chars) {
+                builderStream.write(c);
+            }
+        }
+
+        return this;
+    }
+
+    public PayloadBuilder append(String... strings) throws IOException {
+        if (strings != null) {
+            for (String s : strings) {
+                builderStream.write(s.getBytes());
+            }
+        }
+
+        return this;
+    }
+
+    public PayloadBuilder append(byte[] payload, int startPosition, int 
length) throws IOException {
+        builderStream.write(payload, startPosition, length);
+
+        return this;
+    }
+
+    public byte[] build() {
+        byte[] answer = builderStream.toByteArray();
+
+        builderStream.reset();
+
+        return answer;
+    }
+
+    public static byte[] build(byte b) {
+        try {
+            return new PayloadBuilder(b).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte) failure", 
e);
+        }
+    }
+
+    public static byte[] build(byte b, byte... bytes) {
+        try {
+            return new PayloadBuilder(b).append(bytes).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte) failure", 
e);
+        }
+    }
+
+    public static byte[] build(byte[] bytes) {
+        try {
+            return new PayloadBuilder(bytes).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte[]) failure", 
e);
+        }
+    }
+
+    public static byte[] build(char c) {
+        try {
+            return new PayloadBuilder(c).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char...) 
failure", e);
+        }
+    }
+
+    public static byte[] build(char c, char... chars) {
+        try {
+            return new PayloadBuilder(c).append(chars).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char...) 
failure", e);
+        }
+    }
+
+    public static byte[] build(char[] chars) {
+        try {
+            return new PayloadBuilder(chars).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char...) 
failure", e);
+        }
+    }
+
+    public static byte[] build(String s) {
+        try {
+            return new PayloadBuilder(s).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String) failure", 
e);
+        }
+    }
+
+    public static byte[] build(String[] strings) {
+        try {
+            return new PayloadBuilder(strings).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String[]) 
failure", e);
+        }
+    }
+
+    public static byte[] build(char start, String s) {
+        try {
+            return new PayloadBuilder(start)
+                    .append(s)
+                    .build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String) failure", 
e);
+        }
+    }
+
+    public static byte[] build(char start, String s, char... end) {
+        try {
+            return new PayloadBuilder(start)
+                    .append(s)
+                    .append(end)
+                    .build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char, String, 
char...) failure", e);
+        }
+    }
+
+    public static byte[] build(char start, byte[] bytes, char... end) {
+        try {
+            return new PayloadBuilder(start)
+                    .append(bytes)
+                    .append(end).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(char, byte[], 
char...) failure", e);
+        }
+    }
+
+    public static byte[] build(String s, char... end) {
+        try {
+            return new PayloadBuilder(s)
+                    .append(end).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(String, char...) 
failure", e);
+        }
+    }
+
+    public static byte[] build(byte[] bytes, char... end) {
+        try {
+            return new PayloadBuilder(bytes)
+                    .append(end).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte[], char...) 
failure", e);
+        }
+    }
+
+    public static byte[] build(byte[] bytes, String s) {
+        try {
+            return new PayloadBuilder(bytes)
+                    .append(s).build();
+        } catch (IOException e) {
+            throw new RuntimeException("PayloadBuilder.build(byte[], String) 
failure", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/test/resources/log4j2.properties 
b/components/camel-mllp/src/test/resources/log4j2.properties
index 8c321d2..e790099 100644
--- a/components/camel-mllp/src/test/resources/log4j2.properties
+++ b/components/camel-mllp/src/test/resources/log4j2.properties
@@ -19,14 +19,16 @@ appender.file.type = File
 appender.file.name = file
 appender.file.fileName = target/camel-mllp-test.log
 appender.file.layout.type = PatternLayout
-appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.file.layout.pattern = %d %X{camel.contextId} [%t] %-5p %-30.30c{1} - 
%m%n
+
 appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file
 
-# loggers = mllp
-# logger.mllp.name = org.apache.camel.component.mllp
-# logger.mllp.level = DEBUG
+loggers = mllp
+logger.mllp.name = org.apache.camel.component.mllp
+# logger.mllp.level = TRACE

Reply via email to