http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 05b8cbe..0ec8564 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 package org.apache.camel.component.mllp;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.BindException;
@@ -29,18 +28,31 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import 
org.apache.camel.component.mllp.impl.AcknowledgmentSynchronizationAdapter;
-import org.apache.camel.component.mllp.impl.MllpUtil;
+import org.apache.camel.component.mllp.impl.Hl7Util;
+import org.apache.camel.component.mllp.impl.MllpBufferedSocketWriter;
+import org.apache.camel.component.mllp.impl.MllpSocketReader;
+import org.apache.camel.component.mllp.impl.MllpSocketUtil;
+import org.apache.camel.component.mllp.impl.MllpSocketWriter;
+import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
 import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator;
 import org.apache.camel.util.IOHelper;
+import org.slf4j.MDC;
 
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_AUTO_ACKNOWLEDGE;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_CHARSET;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_EVENT_TYPE;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_LOCAL_ADDRESS;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_MESSAGE_CONTROL;
@@ -49,6 +61,8 @@ import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_PROCESSING_ID;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_RECEIVING_APPLICATION;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_RECEIVING_FACILITY;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_REMOTE_ADDRESS;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_AFTER_SEND;
+import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_BEFORE_SEND;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_SECURITY;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_SENDING_APPLICATION;
 import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_SENDING_FACILITY;
@@ -56,16 +70,19 @@ import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_TIMESTAMP;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_TRIGGER_EVENT;
 import static org.apache.camel.component.mllp.MllpConstants.MLLP_VERSION_ID;
 import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
-import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
 
 /**
  * The MLLP consumer.
  */
 public class MllpTcpServerConsumer extends DefaultConsumer {
+    public static final int SOCKET_STARTUP_TEST_WAIT = 100;
+    public static final int SOCKET_STARTUP_TEST_READ_TIMEOUT = 250;
     ServerSocketThread serverSocketThread;
 
     List<ClientSocketThread> clientThreads = new LinkedList<>();
 
+    Hl7AcknowledgementGenerator acknowledgementGenerator = new 
Hl7AcknowledgementGenerator();
+
     private final MllpEndpoint endpoint;
 
     public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) {
@@ -191,11 +208,10 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
          * short.
          */
         public void run() {
-            log.debug("Starting acceptor thread");
+            MDC.put("camel.contextId", endpoint.getCamelContext().getName());
 
             try {
                 while (!isInterrupted()  &&  null != serverSocket && 
serverSocket.isBound()  &&  !serverSocket.isClosed()) {
-                    // TODO: Need to check maxConnections and figure out what 
to do when exceeded
                     Socket socket = null;
                     try {
                         socket = serverSocket.accept();
@@ -229,16 +245,15 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
                     /* Wait a bit and then check and see if the socket is 
really there - it could be a load balancer
                      pinging the port
                       */
-                        Thread.sleep(100);
                         if (socket.isConnected() && !socket.isClosed()) {
-                            log.debug("Socket appears to be there - check for 
available data");
+                            log.debug("Socket appears to be there - checking 
for available data in {} milliseconds", SOCKET_STARTUP_TEST_WAIT);
+                            Thread.sleep(SOCKET_STARTUP_TEST_WAIT);
+
                             InputStream inputStream;
                             try {
                                 inputStream = socket.getInputStream();
                             } catch (IOException ioEx) {
-                                // Bad Socket -
-                                log.warn("Failed to retrieve the InputStream 
for socket after the initial connection was accepted");
-                                MllpUtil.resetConnection(socket);
+                                MllpSocketUtil.reset(socket, log, "Failed to 
retrieve the InputStream for socket after the initial connection was accepted");
                                 continue;
                             }
 
@@ -251,13 +266,12 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
                             }
 
                             // The easy check failed - so trigger a blocking 
read
-                            socket.setSoTimeout(100);
+                            MllpSocketUtil.setSoTimeout(socket, 
SOCKET_STARTUP_TEST_READ_TIMEOUT, log, "Preparing to check for available data 
on component startup");
                             try {
                                 int tmpByte = inputStream.read();
-                                socket.setSoTimeout(endpoint.receiveTimeout);
                                 if (-1 == tmpByte) {
-                                    log.debug("Socket.read() returned 
END_OF_STREAM - resetting connection");
-                                    MllpUtil.resetConnection(socket);
+                                    log.debug("Check for available data failed 
- Socket.read() returned END_OF_STREAM");
+                                    MllpSocketUtil.close(socket, null, null);
                                 } else {
                                     ClientSocketThread clientThread = new 
ClientSocketThread(socket, tmpByte);
                                     clientThreads.add(clientThread);
@@ -265,10 +279,20 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
                                 }
                             } catch (SocketTimeoutException timeoutEx) {
                                 // No data, but the socket is there
-                                log.debug("No Data - but the socket is there.  
Starting ClientSocketThread");
+                                String logMessageFormat =
+                                    "Check for available data failed - 
Socket.read() timed-out after {} milliseconds."
+                                        + "  No Data - but the socket is 
there.  Starting ClientSocketThread";
+                                log.debug(logMessageFormat, 
SOCKET_STARTUP_TEST_READ_TIMEOUT);
                                 ClientSocketThread clientThread = new 
ClientSocketThread(socket, null);
                                 clientThreads.add(clientThread);
                                 clientThread.start();
+                            } catch (IOException ioEx) {
+                                log.debug("Ignoring IOException encountered 
when attempting to read a byte - connection was reset");
+                                try {
+                                    socket.close();
+                                } catch (IOException closeEx) {
+                                    log.debug("Ignoring IOException 
encountered when attempting to close the connection after the connection reset 
was detected", closeEx);
+                                }
                             }
                         }
                     } catch (SocketTimeoutException timeoutEx) {
@@ -320,10 +344,12 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
     }
 
     /**
-     * Nested Class read the Socket
+     * Nested Class reads the Socket
      */
     class ClientSocketThread extends Thread {
-        Socket clientSocket;
+        final Socket clientSocket;
+        final MllpSocketReader mllpSocketReader;
+        final MllpSocketWriter mllpSocketWriter;
 
         Integer initialByte;
 
@@ -343,8 +369,14 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
             this.clientSocket.setSoLinger(false, -1);
 
             // Initial Read Timeout
-            this.clientSocket.setSoTimeout(endpoint.receiveTimeout);
+            MllpSocketUtil.setSoTimeout(clientSocket, endpoint.receiveTimeout, 
log, "Constructing ClientSocketThread");
 
+            mllpSocketReader = new MllpSocketReader(this.clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout, false);
+            if (endpoint.bufferWrites) {
+                mllpSocketWriter = new 
MllpBufferedSocketWriter(this.clientSocket, true);
+            } else {
+                mllpSocketWriter = new MllpSocketWriter(this.clientSocket, 
true);
+            }
         }
 
         /**
@@ -375,78 +407,238 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
         @Override
         public void run() {
             int receiveTimeoutCounter = 0;
+            MDC.put("camel.contextId", endpoint.getCamelContext().getName());
 
             while (!isInterrupted()  &&  null != clientSocket  &&  
clientSocket.isConnected()  &&  !clientSocket.isClosed()) {
                 byte[] hl7MessageBytes = null;
-                // Send the message on for processing and wait for the response
-                log.debug("Reading data ....");
+
+                log.debug("Checking for data ....");
                 try {
-                    if (null != initialByte && START_OF_BLOCK == initialByte) {
-                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout);
-                    } else {
-                        try {
-                            if (!MllpUtil.openFrame(clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout)) {
-                                receiveTimeoutCounter = 0;
-                                continue;
-                            } else {
-                                receiveTimeoutCounter = 0;
-                            }
-                        } catch (SocketTimeoutException timeoutEx) {
-                            // When thrown by openFrame, it indicates that no 
data was available - but no error
-                            if (endpoint.maxReceiveTimeouts > 0 && 
++receiveTimeoutCounter >= endpoint.maxReceiveTimeouts) {
-                                // TODO:  Enhance logging??
-                                log.warn("Idle Client - resetting connection");
-                                MllpUtil.resetConnection(clientSocket);
-                            }
-                            continue;
+                    hl7MessageBytes = 
mllpSocketReader.readEnvelopedPayload(initialByte);
+                    if (hl7MessageBytes == null) {
+                        // No data received - check for max timeouts
+                        if (endpoint.maxReceiveTimeouts > 0 && 
++receiveTimeoutCounter >= endpoint.maxReceiveTimeouts) {
+                            String reasonMessage = String.format("Idle Client 
after %d receive timeouts [%d-milliseconds] - resetting connection", 
receiveTimeoutCounter, endpoint.receiveTimeout);
+                            MllpSocketUtil.reset(clientSocket, log, 
reasonMessage);
                         }
-                        hl7MessageBytes = MllpUtil.closeFrame(clientSocket, 
endpoint.receiveTimeout, endpoint.readTimeout);
+                        continue;
                     }
                 } catch (MllpException mllpEx) {
                     Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOut);
                     exchange.setException(mllpEx);
-                    return;
+                    log.warn("Exception encountered reading payload - sending 
exception to route", mllpEx);
+                    try {
+                        getProcessor().process(exchange);
+                    } catch (Exception e) {
+                        log.error("Exception encountered processing exchange 
with exception encounter reading payload", e);
+                    }
+                    continue;
                 } finally {
                     initialByte = null;
                 }
 
-                if (null == hl7MessageBytes) {
+                // Send the message on for processing and wait for the response
+                log.debug("Populating the exchange with received message");
+                Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOut);
+                try {
+                    createUoW(exchange);
+                    Message message = exchange.getIn();
+                    message.setBody(hl7MessageBytes, byte[].class);
+
+                    message.setHeader(MLLP_LOCAL_ADDRESS, 
clientSocket.getLocalAddress().toString());
+                    message.setHeader(MLLP_REMOTE_ADDRESS, 
clientSocket.getRemoteSocketAddress());
+                    message.setHeader(MLLP_AUTO_ACKNOWLEDGE, endpoint.autoAck);
+
+                    if (endpoint.validatePayload) {
+                        String exceptionMessage = 
Hl7Util.generateInvalidPayloadExceptionMessage(hl7MessageBytes);
+                        if (exceptionMessage != null) {
+                            exchange.setException(new 
MllpInvalidMessageException(exceptionMessage, hl7MessageBytes));
+                        }
+                    }
+                    populateHl7DataHeaders(exchange, message, hl7MessageBytes);
+
+                    log.debug("Calling processor");
+                    try {
+                        getProcessor().process(exchange);
+                        sendAcknowledgement(hl7MessageBytes, exchange);
+                    } catch (RuntimeException runtimeEx) {
+                        throw runtimeEx;
+                    } catch (Exception ex) {
+                        log.error("Unexpected exception processing exchange", 
ex);
+                    }
+                } catch (Exception uowEx) {
+                    // TODO:  Handle this correctly
+                    exchange.setException(uowEx);
+                    log.warn("Exception encountered creating Unit of Work - 
sending exception to route", uowEx);
+                    try {
+                        getProcessor().process(exchange);
+                    } catch (Exception e) {
+                        log.error("Exception encountered processing exchange 
with exception encountered createing Unit of Work", e);
+                    }
                     continue;
+                } finally {
+                    if (exchange != null) {
+                        doneUoW(exchange);
+                    }
                 }
 
-                log.debug("Populating the exchange with received message");
-                Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOut);
-                Message message = exchange.getIn();
-                message.setBody(hl7MessageBytes, byte[].class);
 
-                message.setHeader(MLLP_LOCAL_ADDRESS, 
clientSocket.getLocalAddress().toString());
-                message.setHeader(MLLP_REMOTE_ADDRESS, 
clientSocket.getRemoteSocketAddress());
-                message.setHeader(MLLP_AUTO_ACKNOWLEDGE, endpoint.autoAck);
+            }
 
-                populateHl7DataHeaders(exchange, message, hl7MessageBytes);
+            log.debug("ClientSocketThread exiting");
+        }
 
-                exchange.addOnCompletion(new 
AcknowledgmentSynchronizationAdapter(clientSocket, hl7MessageBytes));
+        private void sendAcknowledgement(byte[] originalHl7MessageBytes, 
Exchange exchange) {
+            log.info("sendAcknowledgement");
+
+            // Check BEFORE_SEND Properties
+            if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, 
boolean.class)) {
+                String reasonMessage = String.format("Exchange property %s is 
%b", MLLP_RESET_CONNECTION_BEFORE_SEND,  
exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, boolean.class));
+                MllpSocketUtil.reset(clientSocket, log, reasonMessage);
+                return;
+            } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, 
boolean.class)) {
+                String reasonMessage = String.format("Exchange property %s is 
%b", MLLP_CLOSE_CONNECTION_BEFORE_SEND,  
exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, boolean.class));
+                MllpSocketUtil.close(clientSocket, log, reasonMessage);
+                return;
+            }
+
+            // Find the acknowledgement body
+            // TODO:  Enhance this to say whether or not the acknowledgment is 
missing or just of an un-convertible type
+            byte[] acknowledgementMessageBytes = 
exchange.getProperty(MLLP_ACKNOWLEDGEMENT, byte[].class);
+            String acknowledgementMessageType = null;
+            if (null == acknowledgementMessageBytes) {
+                boolean autoAck = exchange.getProperty(MLLP_AUTO_ACKNOWLEDGE, 
true, boolean.class);
+                if (!autoAck) {
+                    exchange.setException(new 
MllpInvalidAcknowledgementException("Automatic Acknowledgement is disabled and 
the "
+                            + MLLP_ACKNOWLEDGEMENT + " exchange property is 
null or cannot be converted to byte[]", originalHl7MessageBytes, 
acknowledgementMessageBytes));
+                    return;
+                }
 
-                log.debug("Calling processor");
+                String acknowledgmentTypeProperty = 
exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
                 try {
-                    getProcessor().process(exchange);
-                } catch (RuntimeException runtimeEx) {
-                    throw runtimeEx;
-                } catch (Exception ex) {
-                    log.error("Unexpected exception processing exchange", ex);
-                    throw new RuntimeException("Unexpected exception 
processing exchange", ex);
+                    if (null == acknowledgmentTypeProperty) {
+                        if (null == exchange.getException()) {
+                            acknowledgementMessageType = "AA";
+                            acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes);
+                        } else {
+                            acknowledgementMessageType = "AE";
+                            acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes);
+                        }
+                    } else {
+                        switch (acknowledgmentTypeProperty) {
+                        case "AA":
+                            acknowledgementMessageType = "AA";
+                            acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes);
+                            break;
+                        case "AE":
+                            acknowledgementMessageType = "AE";
+                            acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes);
+                            break;
+                        case "AR":
+                            acknowledgementMessageType = "AR";
+                            acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationRejectAcknowledgementMessage(originalHl7MessageBytes);
+                            break;
+                        default:
+                            exchange.setException(new 
Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + 
acknowledgmentTypeProperty));
+                            return;
+                        }
+                    }
+                } catch (Hl7AcknowledgementGenerationException 
ackGenerationException) {
+                    exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, 
ackGenerationException);
+                    exchange.setException(ackGenerationException);
                 }
+            } else {
+                final byte bM = 77;
+                final byte bS = 83;
+                final byte bA = 65;
+                final byte bE = 69;
+                final byte bR = 82;
+
+                final byte fieldSeparator = originalHl7MessageBytes[3];
+                // Acknowledgment is specified in exchange property - 
determine the acknowledgement type
+                for (int i = 0; i < originalHl7MessageBytes.length; ++i) {
+                    if (SEGMENT_DELIMITER == i) {
+                        if (i + 7 < originalHl7MessageBytes.length // Make 
sure we don't run off the end of the message
+                                && bM == originalHl7MessageBytes[i + 1] && bS 
== originalHl7MessageBytes[i + 2]
+                                && bA == originalHl7MessageBytes[i + 3] && 
fieldSeparator == originalHl7MessageBytes[i + 4]) {
+                            if (fieldSeparator != originalHl7MessageBytes[i + 
7]) {
+                                log.warn("MSA-1 is longer than 2-bytes - 
ignoring trailing bytes");
+                            }
+                            // Found MSA - pull acknowledgement bytes
+                            byte[] acknowledgmentTypeBytes = new byte[2];
+                            acknowledgmentTypeBytes[0] = 
originalHl7MessageBytes[i + 5];
+                            acknowledgmentTypeBytes[1] = 
originalHl7MessageBytes[i + 6];
+                            try {
+                                acknowledgementMessageType = 
IOConverter.toString(acknowledgmentTypeBytes, exchange);
+                            } catch (IOException ioEx) {
+                                throw new RuntimeException("Failed to convert 
acknowledgement message to string", ioEx);
+                            }
 
+                            // Verify it's a valid acknowledgement code
+                            if (bA != acknowledgmentTypeBytes[0]) {
+                                switch (acknowledgementMessageBytes[1]) {
+                                case bA:
+                                case bR:
+                                case bE:
+                                    break;
+                                default:
+                                    log.warn("Invalid acknowledgement type [" 
+ acknowledgementMessageType + "] found in message - should be AA, AE or AR");
+                                }
+                            }
+
+                            // if the MLLP_ACKNOWLEDGEMENT_TYPE property is 
set on the exchange, make sure it matches
+                            String acknowledgementTypeProperty = 
exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
+                            if (null != acknowledgementTypeProperty && 
!acknowledgementTypeProperty.equals(acknowledgementMessageType)) {
+                                log.warn("Acknowledgement type found in 
message [" + acknowledgementMessageType + "] does not match "
+                                        + MLLP_ACKNOWLEDGEMENT_TYPE + " 
exchange property value [" + acknowledgementTypeProperty + "] - using value 
found in message");
+                            }
+                        }
+                    }
+                }
             }
 
-            log.debug("ClientSocketThread exiting");
+            Message message;
+            if (exchange.hasOut()) {
+                message = exchange.getOut();
+            } else {
+                message = exchange.getIn();
+            }
+            message.setHeader(MLLP_ACKNOWLEDGEMENT, 
acknowledgementMessageBytes);
+            // TODO:  Use the charset of the exchange
+            message.setHeader(MLLP_ACKNOWLEDGEMENT_STRING, new 
String(acknowledgementMessageBytes));
+            message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, 
acknowledgementMessageType);
+
+            // Send the acknowledgement
+            log.debug("Sending Acknowledgement: {}", 
MllpComponent.covertBytesToPrintFriendlyString(acknowledgementMessageBytes));
+            try {
+                
mllpSocketWriter.writeEnvelopedPayload(originalHl7MessageBytes, 
acknowledgementMessageBytes);
+            } catch (MllpException mllpEx) {
+                log.error("MLLP Acknowledgement failure: {}", mllpEx);
+                MllpAcknowledgementDeliveryException deliveryException = new 
MllpAcknowledgementDeliveryException(originalHl7MessageBytes, 
acknowledgementMessageBytes, mllpEx);
+                exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, 
deliveryException);
+                exchange.setException(deliveryException);
+            }
+
+            // Check AFTER_SEND Properties
+            if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, 
boolean.class)) {
+                String reasonMessage = String.format("Exchange property %s is 
%b", MLLP_RESET_CONNECTION_AFTER_SEND,  
exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, boolean.class));
+                MllpSocketUtil.reset(clientSocket, log, reasonMessage);
+                return;
+            } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, 
boolean.class)) {
+                String reasonMessage = String.format("Exchange property %s is 
%b", MLLP_CLOSE_CONNECTION_AFTER_SEND,  
exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, boolean.class));
+                MllpSocketUtil.reset(clientSocket, log, reasonMessage);
+            }
         }
 
         private void populateHl7DataHeaders(Exchange exchange, Message 
message, byte[] hl7MessageBytes) {
+            if (hl7MessageBytes == null ||  hl7MessageBytes.length < 8) {
+                // Not enough data to populate anything - just return
+                return;
+            }
             // Find the end of the MSH and indexes of the fields in the MSH to 
populate message headers
             final byte fieldSeparator = hl7MessageBytes[3];
             int endOfMSH = -1;
-            List<Integer> fieldSeparatorIndexes = new ArrayList<>(10);  // We 
need at least 10 fields to create the acknowledgment
+            List<Integer> fieldSeparatorIndexes = new ArrayList<>(10);  // We 
should have at least 10 fields
 
             for (int i = 0; i < hl7MessageBytes.length; ++i) {
                 if (fieldSeparator == hl7MessageBytes[i]) {
@@ -541,11 +733,7 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
         @Override
         public void interrupt() {
             if (null != clientSocket  &&  clientSocket.isConnected()  && 
!clientSocket.isClosed()) {
-                try {
-                    clientSocket.close();
-                } catch (IOException ex) {
-                    log.warn("Exception encoutered closing client Socket in 
interrupt", ex);
-                }
+                MllpSocketUtil.close(clientSocket, log, 
this.getClass().getSimpleName() + " interrupted");
             }
             super.interrupt();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
index 7c2014a..b1e4906 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTimeoutException.java
@@ -17,43 +17,45 @@
 package org.apache.camel.component.mllp;
 
 /**
- * Raised when a MLLP Producer or Consumer encounter a timeout reading a 
message
+ * Raised when a MLLP Producer or Consumer encounter a timeout reading a 
message or an acknowledgment
  */
 public class MllpTimeoutException extends MllpException {
-    private final byte[] hl7Message;
+    static final String EXCEPTION_MESSAGE = "Timeout receiving HL7 Message";
 
-    public MllpTimeoutException(String message, byte[] hl7Message) {
-        super(message);
-        this.hl7Message = hl7Message;
+    public MllpTimeoutException(byte[] partialHl7Message) {
+        super(EXCEPTION_MESSAGE, partialHl7Message);
     }
 
-    public MllpTimeoutException(String message, byte[] hl7Message, Throwable 
cause) {
-        super(message, cause);
-        this.hl7Message = hl7Message;
+    public MllpTimeoutException(String message, byte[] partialHl7Message) {
+        super(message, partialHl7Message);
     }
 
-    public byte[] getHl7Message() {
-        return hl7Message;
+    public MllpTimeoutException(byte[] partialHl7Message, Throwable cause) {
+        super(EXCEPTION_MESSAGE, partialHl7Message, cause);
     }
 
-    @Override
-    public String getMessage() {
-        if (isLogPhi()) {
-            return String.format("%s:\n\tHL7 Message: %s", super.getMessage(), 
covertBytesToPrintFriendlyString(hl7Message));
-        } else {
-            return super.getMessage();
-        }
+    public MllpTimeoutException(String message, byte[] partialHl7Message, 
Throwable cause) {
+        super(message, partialHl7Message, cause);
     }
 
-    @Override
-    public String toString() {
-        StringBuilder stringBuilder = new 
StringBuilder(this.getClass().getName());
+    protected MllpTimeoutException(String message, byte[] hl7Message, byte[] 
partialHl7Acknowledgement) {
+        super(message, hl7Message, partialHl7Acknowledgement);
+    }
 
-        stringBuilder.append(": {hl7Message=")
-                .append(covertBytesToPrintFriendlyString(hl7Message))
-                .append("}");
+    protected MllpTimeoutException(String message, byte[] hl7Message, byte[] 
partialHl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, partialHl7Acknowledgement, cause);
+    }
 
-        return stringBuilder.toString();
+    /**
+     * Get the HL7 message payload associated with this exception, if any.
+     *
+     * @return If the timeout occurred while attempting to receive an HL7 
Message, this will be null.  If the timeout
+     * occurred while attempting to receive an HL7 Acknowledgement, this will 
be the HL7 Message.  If the timeout occurred
+     * while attempting to complete the read of an HL7 message (i.e. part of 
the message has already been read), this
+     * will be the partial acknowledgement payload that was read before the 
timeout.
+     */
+    public byte[] getHl7Message() {
+        return super.getHl7Message();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
index dd5bf42..2661d7c 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpWriteException.java
@@ -20,40 +20,19 @@ package org.apache.camel.component.mllp;
  * Raised when a MLLP Producer or consumer encounter an error transmitting data
  */
 public class MllpWriteException extends MllpException {
-    private final byte[] mllpPayload;
-
-    public MllpWriteException(String message, byte[] mllpPayload) {
-        super(message);
-        this.mllpPayload = mllpPayload;
-    }
-
-    public MllpWriteException(String message, byte[] mllpPayload, Throwable 
cause) {
-        super(message, cause);
-        this.mllpPayload = mllpPayload;
+    public MllpWriteException(String message, byte[] hl7Message) {
+        super(message, hl7Message);
     }
 
-    public byte[] getMllpPayload() {
-        return mllpPayload;
+    public MllpWriteException(String message, byte[] hl7Message, byte[] 
hl7Acknowledgement) {
+        super(message, hl7Message, hl7Acknowledgement);
     }
 
-    @Override
-    public String getMessage() {
-        if (isLogPhi()) {
-            return String.format("%s:\n\tMLLP Payload: %s", 
super.getMessage(), covertBytesToPrintFriendlyString(mllpPayload));
-        } else {
-            return super.getMessage();
-        }
+    public MllpWriteException(String message, byte[] hl7Message, Throwable 
cause) {
+        super(message, hl7Message, cause);
     }
 
-    @Override
-    public String toString() {
-        StringBuilder stringBuilder = new 
StringBuilder(this.getClass().getName());
-
-        stringBuilder.append(": {mllpPayload=")
-                .append(covertBytesToPrintFriendlyString(mllpPayload))
-                .append("}");
-
-        return stringBuilder.toString();
+    public MllpWriteException(String message, byte[] hl7Message, byte[] 
hl7Acknowledgement, Throwable cause) {
+        super(message, hl7Message, hl7Acknowledgement, cause);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java
deleted file mode 100644
index c7916d4..0000000
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/AcknowledgmentSynchronizationAdapter.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mllp.impl;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Route;
-import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException;
-import org.apache.camel.component.mllp.MllpException;
-import org.apache.camel.component.mllp.MllpInvalidAcknowledgementException;
-import org.apache.camel.converter.IOConverter;
-import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerationException;
-import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_AUTO_ACKNOWLEDGE;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_AFTER_SEND;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_CLOSE_CONNECTION_BEFORE_SEND;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_AFTER_SEND;
-import static 
org.apache.camel.component.mllp.MllpConstants.MLLP_RESET_CONNECTION_BEFORE_SEND;
-import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
-
-public class AcknowledgmentSynchronizationAdapter extends 
SynchronizationAdapter {
-    Logger log = LoggerFactory.getLogger(this.getClass());
-    final byte[] originalHl7MessageBytes;
-    Hl7AcknowledgementGenerator acknowledgementGenerator = new 
Hl7AcknowledgementGenerator();
-    private Socket clientSocket;
-
-    public AcknowledgmentSynchronizationAdapter(Socket clientSocket, byte[] 
hl7MessageBytes) {
-        this.clientSocket = clientSocket;
-        this.originalHl7MessageBytes = hl7MessageBytes;
-    }
-
-    @Override
-    public int getOrder() {
-        return HIGHEST;
-    }
-
-    @Override
-    public void onAfterRoute(Route route, Exchange exchange) {
-        log.info("onAfterRoute");
-
-        // Check BEFORE_SEND Properties
-        if (exchange.getProperty(MLLP_RESET_CONNECTION_BEFORE_SEND, 
boolean.class)) {
-            MllpUtil.resetConnection(clientSocket);
-            return;
-        } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_BEFORE_SEND, 
boolean.class)) {
-            MllpUtil.closeConnection(clientSocket);
-            return;
-        }
-
-        // Find the acknowledgement body
-        // TODO:  Enhance this to say whether or not the acknowledgment is 
missing or just of an unconvertible type
-        byte[] acknowledgementMessageBytes = 
exchange.getProperty(MLLP_ACKNOWLEDGEMENT, byte[].class);
-        String acknowledgementMessageType = null;
-        if (null == acknowledgementMessageBytes) {
-            boolean autoAck = exchange.getProperty(MLLP_AUTO_ACKNOWLEDGE, 
true, boolean.class);
-            if (!autoAck) {
-                exchange.setException(new 
MllpInvalidAcknowledgementException("Automatic Acknowledgement is disabled and 
the "
-                        + MLLP_ACKNOWLEDGEMENT + " exchange property is null 
or cannot be converted to byte[]", originalHl7MessageBytes, 
acknowledgementMessageBytes));
-                return;
-            }
-
-            String acknowledgmentTypeProperty = 
exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
-            try {
-                if (null == acknowledgmentTypeProperty) {
-                    if (null == exchange.getException()) {
-                        acknowledgementMessageType = "AA";
-                        acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes);
-                    } else {
-                        acknowledgementMessageType = "AE";
-                        acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes);
-                    }
-                } else {
-                    switch (acknowledgmentTypeProperty) {
-                    case "AA":
-                        acknowledgementMessageType = "AA";
-                        acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationAcceptAcknowledgementMessage(originalHl7MessageBytes);
-                        break;
-                    case "AE":
-                        acknowledgementMessageType = "AE";
-                        acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationErrorAcknowledgementMessage(originalHl7MessageBytes);
-                        break;
-                    case "AR":
-                        acknowledgementMessageType = "AR";
-                        acknowledgementMessageBytes = 
acknowledgementGenerator.generateApplicationRejectAcknowledgementMessage(originalHl7MessageBytes);
-                        break;
-                    default:
-                        exchange.setException(new 
Hl7AcknowledgementGenerationException("Unsupported acknowledgment type: " + 
acknowledgmentTypeProperty));
-                        return;
-                    }
-                }
-            } catch (Hl7AcknowledgementGenerationException 
ackGenerationException) {
-                exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, 
ackGenerationException);
-                exchange.setException(ackGenerationException);
-            }
-        } else {
-            final byte bM = 77;
-            final byte bS = 83;
-            final byte bA = 65;
-            final byte bE = 69;
-            final byte bR = 82;
-
-            final byte fieldSeparator = originalHl7MessageBytes[3];
-            // Acknowledgment is specified in exchange property - determine 
the acknowledgement type
-            for (int i = 0; i < originalHl7MessageBytes.length; ++i) {
-                if (SEGMENT_DELIMITER == i) {
-                    if (i + 7 < originalHl7MessageBytes.length // Make sure we 
don't run off the end of the message
-                            && bM == originalHl7MessageBytes[i + 1] && bS == 
originalHl7MessageBytes[i + 2]
-                            && bA == originalHl7MessageBytes[i + 3] && 
fieldSeparator == originalHl7MessageBytes[i + 4]) {
-                        if (fieldSeparator != originalHl7MessageBytes[i + 7]) {
-                            log.warn("MSA-1 is longer than 2-bytes - ignoring 
trailing bytes");
-                        }
-                        // Found MSA - pull acknowledgement bytes
-                        byte[] acknowledgmentTypeBytes = new byte[2];
-                        acknowledgmentTypeBytes[0] = originalHl7MessageBytes[i 
+ 5];
-                        acknowledgmentTypeBytes[1] = originalHl7MessageBytes[i 
+ 6];
-                        try {
-                            acknowledgementMessageType = 
IOConverter.toString(acknowledgmentTypeBytes, exchange);
-                        } catch (IOException ioEx) {
-                            throw new RuntimeException("Failed to convert 
acknowledgement message to string", ioEx);
-                        }
-
-                        // Verify it's a valid acknowledgement code
-                        if (bA != acknowledgmentTypeBytes[0]) {
-                            switch (acknowledgementMessageBytes[1]) {
-                            case bA:
-                            case bR:
-                            case bE:
-                                break;
-                            default:
-                                log.warn("Invalid acknowledgement type [" + 
acknowledgementMessageType + "] found in message - should be AA, AE or AR");
-                            }
-                        }
-
-                        // if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on 
the exchange, make sure it matches
-                        String acknowledgementTypeProperty = 
exchange.getProperty(MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
-                        if (null != acknowledgementTypeProperty && 
!acknowledgementTypeProperty.equals(acknowledgementMessageType)) {
-                            log.warn("Acknowledgement type found in message [" 
+ acknowledgementMessageType + "] does not match "
-                                    + MLLP_ACKNOWLEDGEMENT_TYPE + " exchange 
property value [" + acknowledgementTypeProperty + "] - using value found in 
message");
-                        }
-                    }
-                }
-            }
-        }
-
-        Message message;
-        if (exchange.hasOut()) {
-            message = exchange.getOut();
-        } else {
-            message = exchange.getIn();
-        }
-        message.setHeader(MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes);
-        message.setHeader(MLLP_ACKNOWLEDGEMENT_TYPE, 
acknowledgementMessageType);
-
-        // Send the acknowledgement
-        log.debug("Sending Acknowledgement");
-        try {
-            MllpUtil.writeFramedPayload(clientSocket, 
acknowledgementMessageBytes);
-        } catch (MllpException mllpEx) {
-            log.error("MLLP Acknowledgement failure: {}", mllpEx);
-            MllpAcknowledgementDeliveryException deliveryException = new 
MllpAcknowledgementDeliveryException(originalHl7MessageBytes, 
acknowledgementMessageBytes, mllpEx);
-            exchange.setProperty(MLLP_ACKNOWLEDGEMENT_EXCEPTION, 
deliveryException);
-            exchange.setException(deliveryException);
-        }
-
-        // Check AFTER_SEND Properties
-        if (exchange.getProperty(MLLP_RESET_CONNECTION_AFTER_SEND, 
boolean.class)) {
-            MllpUtil.resetConnection(clientSocket);
-            return;
-        } else if (exchange.getProperty(MLLP_CLOSE_CONNECTION_AFTER_SEND, 
boolean.class)) {
-            MllpUtil.closeConnection(clientSocket);
-        }
-
-        super.onAfterRoute(route, exchange);
-    }
-
-    @Override
-    public void onComplete(Exchange exchange) {
-        log.info("onComplete");
-        super.onComplete(exchange);
-
-    }
-
-    @Override
-    public void onFailure(Exchange exchange) {
-        log.warn("onFailure");
-        super.onFailure(exchange);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java
new file mode 100644
index 0000000..0c762cb
--- /dev/null
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/Hl7Util.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.MESSAGE_TERMINATOR;
+import static org.apache.camel.component.mllp.MllpEndpoint.SEGMENT_DELIMITER;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+public final class Hl7Util {
+    static final Logger LOG = LoggerFactory.getLogger(Hl7Util.class);
+
+    private Hl7Util() {
+    }
+
+    public static String generateInvalidPayloadExceptionMessage(final byte[] 
hl7Bytes) {
+        if (hl7Bytes == null) {
+            return "HL7 payload is null";
+        }
+
+        return generateInvalidPayloadExceptionMessage(hl7Bytes, 
hl7Bytes.length);
+    }
+
+    /**
+     * Verifies that the HL7 payload array
+     * <p>
+     * The MLLP protocol does not allow embedded START_OF_BLOCK or 
END_OF_BLOCK characters.  The END_OF_DATA character
+     * is allowed (and expected) because it is also the segment delimiter for 
an HL7 message
+     *
+     * @param hl7Bytes the HL7 payload to validate
+     * @return If the payload is invalid, an error message suitable for 
inclusion in an exception is returned.  If
+     * the payload is valid, null is returned;
+     */
+    public static String generateInvalidPayloadExceptionMessage(final byte[] 
hl7Bytes, final int length) {
+        if (hl7Bytes == null) {
+            return "HL7 payload is null";
+        }
+
+        if (hl7Bytes.length <= 0) {
+            return "HL7 payload is empty";
+        }
+
+        if (length > hl7Bytes.length) {
+            LOG.warn("The length specified for the HL7 payload array <{}> is 
greater than the actual length of the array <{}> - only validating {} bytes", 
length, hl7Bytes.length, length);
+        }
+
+        if (hl7Bytes.length < 3 || hl7Bytes[0] != 'M' || hl7Bytes[1] != 'S' || 
hl7Bytes[2] != 'H') {
+            return String.format("The first segment of the HL7 payload {%s} is 
not an MSH segment", new String(hl7Bytes, 0, Math.min(3, hl7Bytes.length)));
+        }
+
+        int validationLength = Math.min(length, hl7Bytes.length);
+
+        if (hl7Bytes[validationLength - 2] != SEGMENT_DELIMITER || 
hl7Bytes[validationLength - 1] != MESSAGE_TERMINATOR) {
+            String format = "The HL7 payload terminating bytes [%#x, %#x] are 
incorrect - expected [%#x, %#x]  {ASCII [<CR>, <LF>]}";
+            return String.format(format, hl7Bytes[validationLength - 2], 
hl7Bytes[validationLength - 1], (byte) SEGMENT_DELIMITER, (byte) 
MESSAGE_TERMINATOR);
+        }
+
+        for (int i = 0; i < validationLength; ++i) {
+            switch (hl7Bytes[i]) {
+            case START_OF_BLOCK:
+                return String.format("HL7 payload contains an embedded 
START_OF_BLOCK {%#x, ASCII <VT>} at index %d", hl7Bytes[i], i);
+            case END_OF_BLOCK:
+                return String.format("HL7 payload contains an embedded 
END_OF_BLOCK {%#x, ASCII <FS>} at index %d", hl7Bytes[i], i);
+            default:
+                // continue on
+            }
+        }
+
+        return null;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java
new file mode 100644
index 0000000..46db872
--- /dev/null
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpBufferedSocketWriter.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException;
+import org.apache.camel.component.mllp.MllpException;
+import org.apache.camel.component.mllp.MllpWriteException;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+public class MllpBufferedSocketWriter extends MllpSocketWriter {
+    static final int DEFAULT_SO_SNDBUF = 65535;
+
+    ByteArrayOutputStream outputBuffer;
+
+    public MllpBufferedSocketWriter(Socket socket, boolean 
acknowledgementWriter) {
+        super(socket, acknowledgementWriter);
+        try {
+            outputBuffer = new 
ByteArrayOutputStream(socket.getSendBufferSize());
+        } catch (SocketException socketEx) {
+            log.warn(String.format("Ignoring exception encountered retrieving 
SO_SNDBUF from the socket - using default size of %d bytes", 
DEFAULT_SO_SNDBUF), socketEx);
+            outputBuffer = new ByteArrayOutputStream(DEFAULT_SO_SNDBUF);
+        }
+    }
+
+    @Override
+    public void writeEnvelopedPayload(byte[] hl7MessageBytes, byte[] 
hl7AcknowledgementBytes) throws MllpException {
+        if (socket == null) {
+            final String errorMessage = "Socket is null";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        } else if (!socket.isConnected()) {
+            final String errorMessage = "Socket is not connected";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        } else if (socket.isClosed()) {
+            final String errorMessage = "Socket is closed";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        }
+
+        OutputStream socketOutputStream = null;
+        try {
+            socketOutputStream = socket.getOutputStream();
+        } catch (IOException e) {
+            final String errorMessage = "Failed to retrieve the OutputStream 
from the Socket";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes, 
hl7AcknowledgementBytes);
+            }
+        }
+
+        outputBuffer.write(START_OF_BLOCK);
+
+        if (isAcknowledgementWriter()) {
+            if (hl7AcknowledgementBytes == null) {
+                log.warn("HL7 Acknowledgement payload is null - sending empty 
MLLP payload");
+            } else if (hl7AcknowledgementBytes.length <= 0) {
+                log.warn("HL7 Acknowledgement payload is empty - sending empty 
MLLP payload");
+            } else {
+                outputBuffer.write(hl7AcknowledgementBytes, 0, 
hl7AcknowledgementBytes.length);
+            }
+        } else {
+            if (hl7MessageBytes == null) {
+                log.warn("HL7 Message payload is null - sending empty MLLP 
payload");
+            } else if (hl7MessageBytes.length <= 0) {
+                log.warn("HL7 Message payload is empty - sending empty MLLP 
payload");
+            } else {
+                outputBuffer.write(hl7MessageBytes, 0, hl7MessageBytes.length);
+            }
+        }
+
+        outputBuffer.write(END_OF_BLOCK);
+        outputBuffer.write(END_OF_DATA);
+
+        try {
+            outputBuffer.writeTo(socketOutputStream);
+            socketOutputStream.flush();
+        } catch (IOException e) {
+            final String errorMessage = "Failed to write the MLLP payload to 
the Socket's OutputStream";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        } finally {
+            outputBuffer.reset();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java
new file mode 100644
index 0000000..7d10cce
--- /dev/null
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketReader.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+import org.apache.camel.component.mllp.MllpAcknowledgementTimeoutException;
+import org.apache.camel.component.mllp.MllpComponent;
+import org.apache.camel.component.mllp.MllpException;
+import org.apache.camel.component.mllp.MllpReceiveAcknowledgementException;
+import org.apache.camel.component.mllp.MllpReceiveException;
+import org.apache.camel.component.mllp.MllpTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+public class MllpSocketReader {
+    final Socket socket;
+    final int receiveTimeout;
+    final int readTimeout;
+    final boolean acknowledgementReader;
+    Logger log = LoggerFactory.getLogger(this.getClass());
+    byte[] receiveBuffer;
+    ByteArrayOutputStream readAdditionalStream;
+
+    public MllpSocketReader(Socket socket, int receiveTimeout, int 
readTimeout, boolean acknowledgementReader) {
+        this.socket = socket;
+        this.receiveTimeout = receiveTimeout;
+        this.readTimeout = readTimeout;
+        this.acknowledgementReader = acknowledgementReader;
+        try {
+            receiveBuffer = new byte[socket.getReceiveBufferSize()];
+        } catch (SocketException socketEx) {
+            throw new IllegalStateException("Cannot retrieve the value of 
SO_RCVBUF from the Socket", socketEx);
+        }
+    }
+
+
+    public byte[] readEnvelopedPayload() throws MllpException {
+        return readEnvelopedPayload(null, null);
+    }
+
+    public byte[] readEnvelopedPayload(byte[] hl7MessageBytes) throws 
MllpException {
+        return readEnvelopedPayload(null, hl7MessageBytes);
+    }
+
+    public byte[] readEnvelopedPayload(Integer initialByte) throws 
MllpException {
+        return readEnvelopedPayload(initialByte, null);
+    }
+
+    protected byte[] readEnvelopedPayload(Integer initialByte, byte[] 
hl7MessageBytes) throws MllpException {
+        byte[] answer = null;
+
+        MllpSocketUtil.setSoTimeout(socket, receiveTimeout, log, "Preparing to 
receive payload");
+
+        InputStream socketInputStream = null;
+        try {
+            socketInputStream = socket.getInputStream();
+        } catch (IOException ioEx) {
+            final String errorMessage = "Failed to retrieve the InputStream 
from the Socket";
+            resetConnection(errorMessage);
+            throw isAcknowledgementReader()
+                    ? new MllpReceiveAcknowledgementException(errorMessage, 
hl7MessageBytes, ioEx)
+                    : new MllpReceiveException(errorMessage, ioEx);
+        }
+
+        // Read the acknowledgment - hopefully in one shot
+        int readCount;
+        int startPosition = (initialByte != null && initialByte == 
START_OF_BLOCK) ? 0 : -1;
+        do { // Read from the socket until the beginning of a MLLP payload is 
found or a timeout occurs
+            try {
+                readCount = socketInputStream.read(receiveBuffer);
+                if (readCount == -1) {
+                    String errorMessage = "END_OF_STREAM encountered while 
attempting to receive payload - was Socket closed?";
+                    resetConnection(errorMessage);
+                    throw isAcknowledgementReader()
+                            ? new 
MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes)
+                            : new MllpReceiveException(errorMessage);
+                } else if (log.isTraceEnabled()) {
+                    log.trace("Received bytes: {}", 
MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, readCount));
+                }
+            } catch (SocketTimeoutException timeoutEx) {
+                if (isAcknowledgementReader()) {
+                    throw new 
MllpAcknowledgementTimeoutException(hl7MessageBytes, timeoutEx);
+                } else {
+                    if (initialByte != null && initialByte == START_OF_BLOCK) {
+                        answer = new byte[1];
+                        answer[0] = initialByte.byteValue();
+                        throw new MllpTimeoutException(answer, timeoutEx);
+                    }
+
+                    return null;
+                }
+            } catch (IOException ioEx) {
+                String errorMessage = "Error receiving payload";
+                log.error(errorMessage, ioEx);
+                resetConnection(errorMessage);
+                throw isAcknowledgementReader()
+                        ? new 
MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, ioEx)
+                        : new MllpReceiveException(errorMessage, ioEx);
+            }
+
+            if (readCount > 0) {  // If some data was read, make sure we found 
the beginning of the message
+                if (initialByte != null && initialByte == START_OF_BLOCK) {
+                    startPosition = 0;
+                } else {
+                    int startOfBlock = 
MllpSocketUtil.findStartOfBlock(receiveBuffer, readCount);
+                    startPosition = (startOfBlock == -1) ? -1 : startOfBlock + 
1;
+                }
+                if (startPosition > 1) {
+                    // Some out-of-band data was received - log it
+                    final String format = "Ignoring {} out-of-band bytes 
received before the beginning of the payload";
+                    int length = readCount - startPosition - 1;
+                    if (MllpComponent.isLogPhi()) {
+                        log.warn(format + ": {}", length, 
MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, length));
+                    } else {
+                        log.warn(format, length);
+                    }
+                }
+            }
+        } while (startPosition == -1);
+
+        // Check to see if the payload is complete
+        int endPosition = MllpSocketUtil.findEndOfMessage(receiveBuffer, 
readCount);
+
+        if (endPosition != -1) {
+            // We have a complete payload - build the result without delimiters
+            if (endPosition < readCount - 3) {
+                // Some out-of-band data was received - log it
+                final String format = "Ignoring {} out-of-band bytes received 
after the end of the payload";
+                int length = readCount - endPosition - 2;
+                if (MllpComponent.isLogPhi()) {
+                    log.warn(format + ": {}", length, 
MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, endPosition + 1, 
length));
+                } else {
+                    log.warn(format, length);
+                }
+            }
+
+            // Build the answer
+            int length = endPosition - startPosition;
+            answer = new byte[length];
+            System.arraycopy(receiveBuffer, startPosition, answer, 0, length);
+        } else {
+            // The payload is incomplete - read it all before returning
+
+            // Write the data already received to the overflow stream, without 
the beginning delimiters
+            getReadAdditionalStream().reset();
+            readAdditionalStream.write(receiveBuffer, startPosition, readCount 
- startPosition);
+
+            // We've already received some data, so switch to the read timeout
+            MllpSocketUtil.setSoTimeout(socket, readTimeout, log, "Preparing 
to continue reading payload");
+
+            // Now the current data is in the overflow stream, continue 
reading until the end of the payload is found or a timeout occurs
+            endPosition = -1;
+            do { // Read from the socket until the end of the MLLP payload is 
found or a timeout occurs
+                try {
+                    readCount = socketInputStream.read(receiveBuffer);
+                    if (readCount == -1) {
+                        String errorMessage = "END_OF_STREAM encountered while 
attempting to read the end of the payload - Socket was closed or reset";
+                        resetConnection(errorMessage);
+                        byte[] partialPayload = (readAdditionalStream.size() > 
0) ? readAdditionalStream.toByteArray() : null;
+                        throw isAcknowledgementReader()
+                                ? new 
MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, 
partialPayload)
+                                : new MllpReceiveException(errorMessage, 
partialPayload);
+                    } else if (log.isTraceEnabled()) {
+                        log.trace("Read additional bytes: {}", 
MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, readCount));
+                    }
+                } catch (SocketTimeoutException timeoutEx) {
+                    String errorMessage = "Timeout reading the end of the 
payload";
+                    resetConnection(errorMessage);
+                    byte[] partialPayload = (readAdditionalStream.size() > 0) 
? readAdditionalStream.toByteArray() : null;
+                    throw isAcknowledgementReader()
+                            ? new 
MllpAcknowledgementTimeoutException(errorMessage, hl7MessageBytes, 
partialPayload, timeoutEx)
+                            : new MllpTimeoutException(errorMessage, 
partialPayload, timeoutEx);
+                } catch (IOException ioEx) {
+                    String errorMessage = "Error reading  the end of the 
payload";
+                    resetConnection(errorMessage);
+                    log.error(errorMessage);
+                    byte[] partialPayload = (readAdditionalStream.size() > 0) 
? readAdditionalStream.toByteArray() : null;
+                    throw isAcknowledgementReader()
+                            ? new 
MllpReceiveAcknowledgementException(errorMessage, hl7MessageBytes, 
partialPayload, ioEx)
+                            : new MllpReceiveException(errorMessage, 
partialPayload, ioEx);
+                }
+                if (readCount > 0) {  // If some data was read, make sure we 
found the end of the message
+                    endPosition = 
MllpSocketUtil.findEndOfMessage(receiveBuffer, readCount);
+                    if (endPosition != -1) {
+                        if (endPosition < readCount - 2) {
+                            final String format = "Ignoring {} out-of-band 
bytes after the end of the payload";
+                            int length = readCount - endPosition - 2;
+                            if (MllpComponent.isLogPhi()) {
+                                log.warn(format + ": {}", length, 
MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, endPosition + 2, 
length));
+                            } else {
+                                log.warn(format, length);
+                            }
+                        }
+                        readAdditionalStream.write(receiveBuffer, 0, 
endPosition);
+                    } else {
+                        readAdditionalStream.write(receiveBuffer, 0, 
readCount);
+                    }
+                }
+            } while (endPosition == -1);
+
+            // All available data has been read - return the data
+            answer = readAdditionalStream.toByteArray();
+        }
+
+        // Check to see if there is any more data available
+        int availableCount;
+        do {
+            try {
+                availableCount = socketInputStream.available();
+            } catch (IOException ioEx) {
+                log.warn("Ignoring IOException encountered while checking for 
additional available trailing bytes", ioEx);
+                break;
+            }
+            if (availableCount > 0) { // if data is available, eat it
+                try {
+                    readCount = socketInputStream.read(receiveBuffer);
+                    final String format = "Ignoring {} out-of-band bytes 
trailing after the end of the payload";
+                    if (MllpComponent.isLogPhi()) {
+                        log.warn(format + ": {}", readCount, 
MllpComponent.covertBytesToPrintFriendlyString(receiveBuffer, 0, readCount));
+                    } else {
+                        log.warn(format, readCount);
+                    }
+                } catch (IOException ioEx) {
+                    log.warn(String.format("Ignoring IOException encountered 
while attempting to read %d bytes of trailing data", availableCount), ioEx);
+                    break;
+                }
+            }
+        } while (availableCount != 0);
+
+        return answer;
+    }
+
+    public void closeConnection(String reasonMessage) {
+        MllpSocketUtil.close(socket, log, reasonMessage);
+    }
+
+    public void resetConnection(String reasonMessage) {
+        MllpSocketUtil.reset(socket, log, reasonMessage);
+    }
+
+    public Socket getSocket() {
+        return socket;
+    }
+
+    public int getReceiveTimeout() {
+        return receiveTimeout;
+    }
+
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    public byte[] getReceiveBuffer() {
+        return receiveBuffer;
+    }
+
+    public boolean isAcknowledgementReader() {
+        return acknowledgementReader;
+    }
+
+    public ByteArrayOutputStream getReadAdditionalStream() {
+        if (readAdditionalStream == null) {
+            readAdditionalStream = new 
ByteArrayOutputStream(receiveBuffer.length);
+        }
+
+        return readAdditionalStream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java
new file mode 100644
index 0000000..fab33a8
--- /dev/null
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketUtil.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+public final class MllpSocketUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MllpSocketUtil.class);
+
+    private MllpSocketUtil() {
+    }
+
+    public static void setSoTimeout(Socket socket, int timeout, Logger logger, 
String reasonMessage) {
+        if (logger != null && logger.isDebugEnabled()) {
+            final String format = "Setting SO_TIMEOUT to {} for connection {}";
+            if (reasonMessage != null && !reasonMessage.isEmpty()) {
+                logger.debug(format + "  Reason: {}", timeout, 
getAddressString(socket), reasonMessage);
+            } else {
+                logger.debug(format, timeout, getAddressString(socket));
+            }
+        }
+        try {
+            socket.setSoTimeout(timeout);
+        } catch (SocketException socketEx) {
+            if (logger != null) {
+                final String format = "Ignoring SocketException encountered 
setting SO_TIMEOUT to %d for connection %s.";
+                if (reasonMessage != null && !reasonMessage.isEmpty()) {
+                    logger.warn(String.format(format + "  Reason: %s", 
timeout, getAddressString(socket), reasonMessage), socketEx);
+                } else {
+                    logger.warn(String.format(format, timeout, 
getAddressString(socket)), socketEx);
+                }
+            }
+        }
+    }
+
+    public static void close(Socket socket, Logger logger, String 
reasonMessage) {
+        if (socket != null && socket.isConnected() && !socket.isClosed()) {
+            final String format = "Closing connection {}";
+
+            String address = getAddressString(socket);
+
+            if (logger != null) {
+                if (reasonMessage != null && !reasonMessage.isEmpty()) {
+                    logger.warn(format + ".  Reason: {}", address, 
reasonMessage);
+                } else {
+                    logger.warn(format, address);
+                }
+            }
+
+            if (!socket.isInputShutdown()) {
+                try {
+                    socket.shutdownInput();
+                } catch (Exception ex) {
+                    String logMessage = String.format("Ignoring Exception 
encountered shutting down the input stream on the client socket %s", address);
+                    if (logger != null) {
+                        logger.warn(logMessage, ex);
+                    } else {
+                        LOG.warn(logMessage, ex);
+                    }
+                }
+            }
+
+            if (!socket.isOutputShutdown()) {
+                try {
+                    socket.shutdownOutput();
+                } catch (Exception ex) {
+                    String logMessage = String.format("Ignoring Exception 
encountered shutting down the output stream on the client socket %s", address);
+                    if (logger != null) {
+                        logger.warn(logMessage, ex);
+                    } else {
+                        LOG.warn(logMessage, ex);
+                    }
+                }
+            }
+
+            try {
+                socket.close();
+            } catch (IOException ioEx) {
+                String logMessage = String.format("Ignoring IOException 
encountered while closing connection %s", address);
+                if (logger != null) {
+                    logger.warn(logMessage, ioEx);
+                } else {
+                    LOG.warn(logMessage, ioEx);
+                }
+            }
+        }
+    }
+
+    public static void reset(Socket socket, Logger logger, String 
reasonMessage) {
+        if (socket != null && socket.isConnected() && !socket.isClosed()) {
+            final String format = "Resetting connection {}";
+
+            String address = getAddressString(socket);
+
+            if (logger != null) {
+                if (reasonMessage != null && !reasonMessage.isEmpty()) {
+                    logger.warn(format + ".  Reason: {}", address, 
reasonMessage);
+                } else {
+                    logger.warn(format, address);
+                }
+            }
+
+            try {
+                socket.setSoLinger(true, 0);
+            } catch (SocketException socketEx) {
+                String logMessage = String.format("Ignoring SocketException 
encountered setting SO_LINGER in preparation for resetting connection %s", 
address);
+                if (logger != null) {
+                    logger.warn(logMessage, socketEx);
+                } else {
+                    LOG.warn(logMessage, socketEx);
+                }
+            }
+
+            try {
+                socket.close();
+            } catch (IOException ioEx) {
+                String logMessage = String.format("Ignoring IOException 
encountered while resetting connection %s", address);
+                if (logger != null) {
+                    logger.warn(logMessage, ioEx);
+                } else {
+                    LOG.warn(logMessage, ioEx);
+                }
+            }
+        }
+    }
+
+    public static String getAddressString(Socket socket) {
+        String localAddressString = "null";
+        String remoteAddressString = "null";
+
+        if (socket != null) {
+            SocketAddress localSocketAddress = socket.getLocalSocketAddress();
+            if (localSocketAddress != null) {
+                localAddressString = localSocketAddress.toString();
+            }
+
+            SocketAddress remoteSocketAddress = 
socket.getRemoteSocketAddress();
+            if (remoteSocketAddress != null) {
+                remoteAddressString = remoteSocketAddress.toString();
+            }
+        }
+
+        return String.format("%s -> %s", localAddressString, 
remoteAddressString);
+    }
+
+    public static int findStartOfBlock(byte[] payload) {
+        if (payload != null) {
+            return findStartOfBlock(payload, payload.length);
+        }
+
+        return -1;
+    }
+
+    /**
+     * Find the beginning of the HL7 Payload
+     * <p>
+     * Searches the payload from the beginning, looking for the START_OF_BLOCK 
character.
+     *
+     * @param payload the payload to check
+     * @param length  the current valid length of the receive buffer
+     * @return the index of the START_OF_BLOCK, or -1 if not found
+     */
+    public static int findStartOfBlock(byte[] payload, int length) {
+        if (payload != null && length >= 0) {
+            for (int i = 0; i < Math.min(length, payload.length); ++i) {
+                if (payload[i] == START_OF_BLOCK) {
+                    return i;
+                }
+            }
+        }
+
+        return -1;
+    }
+
+    public static int findEndOfMessage(byte[] payload) {
+        if (payload != null) {
+            return findEndOfMessage(payload, payload.length);
+        }
+
+        return -1;
+    }
+
+    /**
+     * Find the end of the HL7 Payload
+     * <p>
+     * Searches the payload from the end, looking for the [END_OF_BLOCK, 
END_OF_DATA] characters.
+     *
+     * @param payload the payload to check
+     * @param length  the current valid length of the receive buffer
+     * @return the index of the END_OF_BLOCK character that terminates the 
message, or -1 if not found
+     */
+    public static int findEndOfMessage(byte[] payload, int length) {
+        if (payload != null && length >= 0) {
+            for (int i = Math.min(length, payload.length) - 1; i > 0; --i) {
+                if (payload[i] == END_OF_DATA) {
+                    if (i > 0 && payload[i - 1] == END_OF_BLOCK) {
+                        return i - 1;
+                    }
+                }
+            }
+        }
+
+        return -1;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6d58b67/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java
new file mode 100644
index 0000000..bd9515c
--- /dev/null
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/impl/MllpSocketWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mllp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.camel.component.mllp.MllpAcknowledgementDeliveryException;
+import org.apache.camel.component.mllp.MllpException;
+import org.apache.camel.component.mllp.MllpWriteException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_BLOCK;
+import static org.apache.camel.component.mllp.MllpEndpoint.END_OF_DATA;
+import static org.apache.camel.component.mllp.MllpEndpoint.START_OF_BLOCK;
+
+public class MllpSocketWriter {
+    static final byte[] PAYLOAD_TERMINATOR;
+
+    static {
+        PAYLOAD_TERMINATOR = new byte[2];
+        PAYLOAD_TERMINATOR[0] = END_OF_BLOCK;
+        PAYLOAD_TERMINATOR[1] = END_OF_DATA;
+    }
+
+    final Socket socket;
+    final boolean acknowledgementWriter;
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    public MllpSocketWriter(Socket socket, boolean acknowledgementWriter) {
+        this.socket = socket;
+        this.acknowledgementWriter = acknowledgementWriter;
+    }
+
+    public void writeEnvelopedPayload(byte[] hl7MessageBytes, byte[] 
hl7AcknowledgementBytes) throws MllpException {
+        if (socket == null) {
+            final String errorMessage = "Socket is null";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        } else if (!socket.isConnected()) {
+            final String errorMessage = "Socket is not connected";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        } else if (socket.isClosed()) {
+            final String errorMessage = "Socket is closed";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes);
+            }
+        }
+
+        OutputStream socketOutputStream = null;
+        try {
+            socketOutputStream = socket.getOutputStream();
+        } catch (IOException e) {
+            final String errorMessage = "Failed to retrieve the OutputStream 
from the Socket";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes, 
hl7AcknowledgementBytes);
+            }
+        }
+
+        try {
+            socketOutputStream.write(START_OF_BLOCK);
+        } catch (IOException e) {
+            final String errorMessage = "Failed to write the START_OF_BLOCK to 
the Socket's OutputStream";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes, 
hl7AcknowledgementBytes);
+            }
+        }
+
+        if (isAcknowledgementWriter()) {
+            if (hl7AcknowledgementBytes == null) {
+                log.warn("HL7 Acknowledgement payload is null - sending empty 
MLLP payload");
+            } else if (hl7AcknowledgementBytes.length <= 0) {
+                log.warn("HL7 Acknowledgement payload is empty - sending empty 
MLLP payload");
+            } else {
+                try {
+                    socketOutputStream.write(hl7AcknowledgementBytes);
+                } catch (IOException ioEx) {
+                    throw new MllpAcknowledgementDeliveryException("Failed to 
write the HL7 Acknowledgement payload to the Socket's OutputStream", 
hl7MessageBytes, hl7AcknowledgementBytes, ioEx);
+                }
+            }
+        } else {
+            if (hl7MessageBytes == null) {
+                log.warn("HL7 Message payload is null - sending empty MLLP 
payload");
+            } else if (hl7MessageBytes.length <= 0) {
+                log.warn("HL7 Message payload is empty - sending empty MLLP 
payload");
+            } else {
+                try {
+                    socketOutputStream.write(hl7MessageBytes);
+                } catch (IOException ioEx) {
+                    throw new MllpWriteException("Failed to write the HL7 
Message payload to the Socket's OutputStream", hl7MessageBytes, 
hl7AcknowledgementBytes, ioEx);
+                }
+            }
+        }
+
+        try {
+            socketOutputStream.write(PAYLOAD_TERMINATOR);
+            socketOutputStream.flush();
+        } catch (IOException e) {
+            final String errorMessage = "Failed to write the END_OF_BLOCK and 
END_OF_DATA to the Socket's OutputStream";
+            if (isAcknowledgementWriter()) {
+                throw new MllpAcknowledgementDeliveryException(errorMessage, 
hl7MessageBytes, hl7AcknowledgementBytes);
+            } else {
+                throw new MllpWriteException(errorMessage, hl7MessageBytes, 
hl7AcknowledgementBytes);
+            }
+        }
+
+    }
+
+    public boolean isAcknowledgementWriter() {
+        return acknowledgementWriter;
+    }
+}

Reply via email to